Experimenting with RabbitMQ – Fanout exchange


Preamble

This is part 3 of a series of blogposts about RabbitMQ. This series aims to provide more information (I cannot vouch for the accuracy of the information as I’m a beginner at RabbitMQ) concerning a series of posts by Derek Greer.

Part 1: Experimenting with RabbitMQ – HelloWorldExample

Part 2: Experimenting with RabbitMQ – LoggingApplication example

Part 3: Experimenting with RabbitMQ – Fanout exchange

Part 4: Experimenting with RabbitMQ – Topic exchange

————————

Here are some experiments I carried out based upon this post and here is my version which you can obtain here:

Producer

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Xml;
using System.Xml.Linq;
using RabbitMQ.Client;

namespace Producer
{
    class Program
    {
        private static volatile bool _cancelling;
        private const string Exchange = "fanout-exchange-example";
        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();
            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);

            var thread = new Thread(() => PublishQuotes(channel));
            thread.Start();

            Console.WriteLine("Press any key to exit");
            Console.ReadKey();

            _cancelling = true;

            channel.Close();
            connection.Close();
        }

        private static void PublishQuotes(IModel channel)
        {
            while (true)
            {
                if (_cancelling)
                {
                    return;
                }
                IEnumerable<string> quotes = FetchStockQuotes(new[] { "GOOG", "HD", "MCD" });

                foreach (var quote in quotes)
                {
                    byte[] message = Encoding.UTF8.GetBytes(quote);
                    channel.BasicPublish(Exchange, "", null, message);
                }
                Thread.Sleep(5000);
            }
        }

        private static IEnumerable<string> FetchStockQuotes(string[] symbols)
        {
            var quotes = new List<string>();

            string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
                String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
            var wc = new WebClient { Proxy = WebRequest.DefaultWebProxy };
            var ms = new MemoryStream(wc.DownloadData(url));
            var reader = new XmlTextReader(ms);
            XDocument doc = XDocument.Load(reader);
            XElement results = doc.Root.Element("results");

            foreach (string symbol in symbols)
            {
                XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);
                quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
            }

            return quotes;
        }
    }
}

Consumer

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        private const string Exchange = "fanout-exchange-example";
        private const string Queue = "quotes";

        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();

            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
            channel.QueueDeclare(Queue, false, false, true, null);
            channel.QueueBind(Queue, Exchange, "anything");

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(Queue, true, consumer);

            Console.WriteLine("In Consumer");
            Console.WriteLine("===========");
            while (true)
            {
                try
                {
                    var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(eventArgs.Body);
                    Console.WriteLine(message);
                    Console.WriteLine("--------");
                }
                catch (EndOfStreamException)
                {
                    // The consumer was cancelled, the model closed, or the connection went away.
                    break;
                }
            }

            channel.Close();
            connection.Close();

        }
    }
}

This code when run worked fine. Then I referred back to this diagram:

and decided to add another consumer

Consumer2

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer2
{
    class Program
    {
        private const string Exchange = "fanout-exchange-example";
        private const string Queue = "quotes2";

        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();

            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
            channel.QueueDeclare(Queue, false, false, true, null);
            channel.QueueBind(Queue, Exchange, "anything");

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(Queue, true, consumer);

            Console.WriteLine("In Consumer2");
            Console.WriteLine("===========");
            while (true)
            {
                try
                {
                    var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(eventArgs.Body);
                    Console.WriteLine(message);
                    Console.WriteLine("--------");
                }
                catch (EndOfStreamException)
                {
                    // The consumer was cancelled, the model closed, or the connection went away.
                    break;
                }
            }

            channel.Close();
            connection.Close();

        }
    }
}

Upon running the application both consumers would receive the same information which is what we expected:

Fanout

Redeclaring the exchange to be direct

First I delete the “fanout-exchange-example” exchange using the web interface. Then in “Producer”, “Consumer” and “Consumer2”, I changed the exchange declaration to:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);

Upon running the application, I got the same results as before. I was initially surprised by this but upon looking at the code, the architecture and the model of RabbitMQ started making more sense. So, let’s break it down by first starting with the Producer and in particular:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);
channel.BasicPublish(Exchange, "", null, message);

In the Producer, we have only declared the Exchange and its type. We have not bound it to any queue whatsoever. However, notice that we have specified the “routing key” to be the empty string. Bear that in mind as this will make more sense later.

Now in Consumer (the same applies for Consumer2 here), consider the following code:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);
channel.QueueDeclare(Queue, false, false, true, null);
channel.QueueBind(Queue, Exchange, "");

First we’re declaring an exchange which is the same as in the Producer. Then we also declare a queue where we will push the message. Finally, and very importantly, we bind the Queue to the Exchange and here is the signature of the “QueueBind” method:

QueueBind(string queue, string exchange, string routingKey)

Now according to the RabbitMQ documentation:

A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages 
(although they can be used for multicast routing as well). Here is how it works:
  • A queue binds to the exchange with a routing key K
  • When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R

So when we bound the Queue to the Exchange using the empty string routing key (remember that I told you to bear this in mind earlier), the message will get delivered to both Consumer and Consumer2.

To prove that this was indeed the case, what I did was to change the routing key for “Consumer2” by using this:

channel.QueueBind(Queue, Exchange, "anything");

and this was the result:

RoutingKey

Hence, this demonstrates one of the differences between a fanoutexchange and a direct exchange. Had the exchange been a fanout exchange, the routing key wouldn’t have mattered one bit and both Consumer and Consumer2 would have received the same messages.

Declaring both consumers to have the same queue

If you have been following along, then you’ll want to revert the exchange type to being fanout and deleting the existing exchange on the server.

So what I did next was to ensure that both Consumer and Consumer2 were using the “quotes” queue by setting the following in Consumer2

private const string Queue = "quotes";

The result was quite interesting

Same queue

As you can see, each consumer would process the message indeterminately i.e. what message will be processed on which server is not controlled. What can be guaranteed is that the messages will be processed. I guess that this could be used to distribute the processing randomly and possibly even out the load across servers.

Advertisements
Posted in .NET, ALT.NET, RabbitMQ

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: