Getting to grips with EasyNetQ – Part 3 – Request Response


This post is about fleshing out the Request Response implementation of EasyNetQ.

Request class

using System;
using EasyNetQ;
using SubscribeWithLoggerConsoleAppNamespace;

namespace RequestClosureConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            var logger = new MyLogger();

            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                var myRequest = new MyRequest();

                using (var publishChannel = bus.OpenPublishChannel())
                {
                    for (var i = 0; i < 5; i++)
                    {
                        myRequest.Text = "Send to Response Server " + i;
                        publishChannel.Request<MyRequest, MyResponse>(myRequest, response =>
                                                                                logger.InfoWrite(
                                                                                     "Got response: {0}", response.Text));                        
                    }
                    Console.ReadKey();
                }
            }
        }
    }
}

The code highlighted in blue is the interesting part because it again assumes that there is some response code listening in to the incoming requests.

Upon running this code, this is what happens:

ENQ-RequestRunOverviewAn exhange has been added but when you look at the details of the exchange added, you’ll see that it’s different to when we run “Publish” code.

ENQ-RequestExchangeFirst of all, the name of the exchange doesn’t follow the same pattern and is simply named “easy_net_q_rpc”. The other difference is the fact that the exchange type is not “topic” but is “direct” instead.

The behaviour while running this code differs in the fact that “queues” are actually created when sending data:

ENQ-QueuesCreatedFollowingRequestIf you look at the names of the queues created, you’ll notice that they are named “easynetq.response.xxxxxx”. The fact that they are contain “response” and the fact that the “Ready” and “Total” columns are all 0 would indicate that these queues would be used for the Response code to send data to. However, when you look at the details in each queue, you will see that none of the queues is bound to the newly created exchange i.e. “easy_net_q_rpc” and they are in fact bound to the default exchange.

ENQ-RandomQueueBinding

This was, to say the least, quite puzzling. Hence, I asked about it on the forums and here is the answer from Mike Hadlow:

Each request creates a short lived temporary queue. They can only be consumed from the requester, but they are bound to an exchange 
that the responder can publish to. Each gets one message (the response), which is swiftly consumed, so you are unlikely to see the 
message on the queue.
Because you end up with 5 consumers all bound to different queues, there's no real order. It's down to whichever RabbitMQ sends first.

Upon exiting the console application, all the queues created by the latter are removed although the exchange created does remain.

Response class

using System;
using EasyNetQ;
using RequestClosureConsoleAppNamespace;
using SubscribeWithLoggerConsoleAppNamespace;

namespace ResponseClosureConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            var logger = new MyLogger();
            Func<MyRequest, MyResponse> respond = request =>
                {
                    logger.InfoWrite("Received {0} ", request.Text);
                    return new MyResponse
                        {
                            Text = "Responding to " + request.Text
                        };
                };

            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                bus.Respond(respond);
                Console.ReadKey();
            }
        }
    }
}

This code is fairly straighforward in the sense that it kind of echoes the content of the Request.

If you now run both console applications, this is what will happen:

ENQ-ResponseQueueCreatedFirst a new queue “RequestClosureConsoleAppNamespace_MyRequest:RequestClosureConsoleAppAssembly” is created and the naming convention follows the one used for a PubSub exchange scenario i.e. there is no “id” in the name.

The other is the order in which the messages are sent and received:

Response log

2013-08-16 11:17:42.6424 INFO Received Send to Response Server 1 
2013-08-16 11:17:42.6414 INFO Received Send to Response Server 0 
2013-08-16 11:17:42.6424 INFO Received Send to Response Server 4 
2013-08-16 11:17:42.6424 INFO Received Send to Response Server 3 
2013-08-16 11:17:42.6424 INFO Received Send to Response Server 2

Request log

2013-08-16 11:17:42.7865 INFO Got response: Responding to Send to Response Server 2
2013-08-16 11:17:42.7985 INFO Got response: Responding to Send to Response Server 0
2013-08-16 11:17:42.7985 INFO Got response: Responding to Send to Response Server 3
2013-08-16 11:17:42.8155 INFO Got response: Responding to Send to Response Server 4
2013-08-16 11:17:42.8155 INFO Got response: Responding to Send to Response Server 1

This is really surprising because not only would it appear that the requests were sent in random order but the responses were processed and sent back in random order as well.

For completeness, here are the

MyRequest and MyResponse classes

namespace RequestClosureConsoleAppNamespace
{
    public class MyRequest|MyResponse    
    {
        public string Text { get; set; }
    }
}
Posted in .NET, ALT.NET, EasyNetQ, RabbitMQ

Getting to grips with EasyNetQ – Part 2 – Logger


In part 1 of this series about EasyNetQ, I’ve provided an overview of what happens when we use the Publish and Subscribe methods as well as explaining the actual default behaviour. In part 2, I will experiment with the logger in EasyNetQ and is a logical continuation of part 1.

In this particular case, I will log to a file using NLog rather than logging to the console which is the default behaviour in EasyNetQ. So here is the code (source code available here) used:

Subscriber with logging

using EasyNetQ;
using PublishConsoleAppNamespace;

namespace SubscribeWithLoggerConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            var logger = new MyLogger();
            logger.InfoWrite("In subscriber");
            var bus = RabbitHutch.CreateBus("host=Ubuntu-12", x => x.Register<IEasyNetQLogger>(_ => logger));

            bus.Subscribe<MyMessage>("SubscribeWithLoggerConsoleAppId"
                                     , msg => logger.InfoWrite(msg.Text));
        }
    }
}

MyLogger class

using System;
using EasyNetQ;
using NLog;

namespace SubscribeWithLoggerConsoleAppNamespace
{
    public class MyLogger:IEasyNetQLogger
    {
        private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
        public void DebugWrite(string format, params object[] args)
        {
            Logger.Debug(format);
        }

        public void InfoWrite(string format, params object[] args)
        {
            Logger.Info(format);
        }

        public void ErrorWrite(string format, params object[] args)
        {
            Logger.Error(format);
        }

        public void ErrorWrite(Exception exception)
        {
            Logger.ErrorException("An exception has occurred", exception);
        }
    }
}

NLog configuration file

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

  <!-- 
  See http://nlog-project.org/wiki/Configuration_file 
  for information on customizing logging rules and outputs.
   -->
  <targets>
    <!-- add your targets here -->

    <target xsi:type="File" name="f" fileName="${basedir}/logs/${shortdate}.log"
            layout="${longdate} ${uppercase:${level}} ${message}" />

  </targets>

  <rules>
    <!-- add your logging rules here -->

    <logger name="*" minlevel="Trace" writeTo="f" />

  </rules>
</nlog>

The code in the SubscribeWithLogger Console App is the one of interest and in particular:

var logger = new MyLogger();
logger.InfoWrite("In subscriber");
var bus = RabbitHutch.CreateBus("host=Ubuntu-12", x => x.Register<IEasyNetQLogger>(_ => logger));

bus.Subscribe<MyMessage>("SubscribeWithLoggerConsoleAppId"
                         , msg => logger.InfoWrite(msg.Text));

This is pretty much the code described in the logging section of  this page. So first, you should run this code on its own and when you do so, the console application will just remain there. That is due to the fact that we are not disposing the connection:

bus.Dispose();

The reason why I’m not disposing of the connection is due to the fact that I don’t want to encounter the issue described at the end of the post in the first post in this series.

In any case close the application and if you look at the RabbitMQ admin web page, you will see that a new queue has been added because I used a new id “SubscribeWithLoggerConsoleAppId”:

ENQ-Queue for loggerNote that this queue has been bound to the already existing exchange “PublishConsoleAppNamespace_MyMessage:PublishConsoleAppAssembly”.

Now run the PublishConsoleApp code, which is also provided in the first post, on its own. What actually surprised me in this case was the number of messages being queued:

ENQ-20000MsgsQueuedIn the first post when we ran this, we had 10,000 messages queued where as now we had twice that amount. In fact, if we look at the “Queues” page we see that each queue contains 10,000 messages.

ENQ-QueueBreakdownSo, the logical question is what messages are now on each queue and the answer lies in running each of the subscriber application in turn.

This is the output when the information is sent to the console window.

ENQ-SubscribeConsoleand this is an excerpt of the output when the information is sent to a log file:

2013-08-15 12:17:04.9033 INFO Hello Rabbit9992
2013-08-15 12:17:04.9033 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9033 INFO Hello Rabbit9993
2013-08-15 12:17:04.9203 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9203 INFO Hello Rabbit9994
2013-08-15 12:17:04.9203 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9333 INFO Hello Rabbit9995
2013-08-15 12:17:04.9333 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9333 INFO Hello Rabbit9996
2013-08-15 12:17:04.9333 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9333 INFO Hello Rabbit9997
2013-08-15 12:17:04.9533 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9533 INFO Hello Rabbit9998
2013-08-15 12:17:04.9533 DEBUG Received 
    RoutingKey: '{0}'
    CorrelationId: '{1}'
    ConsumerTag: '{2}'
2013-08-15 12:17:04.9663 INFO Hello Rabbit9999

Effectively, it would appear that both queues contained the same published data and although the exchange type is topic, it is behaving very much like a fanout exchange i.e. the messages published are sent to all bound queues.

When you look at the overview of these two operations, you will see that both operations were quite quick although the debug to console (1) was quicker:

ENQ-TimeTaken

Posted in .NET, ALT.NET, EasyNetQ, RabbitMQ

Getting to grips with EasyNetQ – Part 1- PubSub


Following on from the series I had on RabbitMQ, I then moved onto EasyNetQ. Now, if you have no idea what RabbitMQ is, then this post is unlikely to be appropriate. I would suggest to first go through the blog posts by Derek Greer and then maybe go through the related blog posts. Having a basic understanding of AMQP (like I do) would also be very beneficial.

Now this post is not really the “Hello World” of how to use EasyNetQ. The GitHub wiki and this video tutorial amply demonstrate how to get going. What I want to achieve in this post is to hopefully give a bit more detail about what and how EasyNetQ is doing under the covers.

So, what I did first was to fork the project on Gitbub so that I could then debug code execution. Then I created my own project which would then reference the forked version of EasyNetQ. Hence, unless you also do the same, you won’t be able to use the code as is.

Publishing

Let’s start with some code in order to publish a message onto the message broker.

using System.Threading;
using EasyNetQ;

namespace PublishConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                Thread.Sleep(1000);                
                try
                {
                    using (var publishChannel = bus.OpenPublishChannel())
                    {
                        for (var i = 0; i < 10000; i++)
                        {
                            var message = new MyMessage
                            {
                                Text = "Hello Rabbit" + i
                            };
                          
                            publishChannel.Publish(message);
                        }
                    }
                }
                catch (EasyNetQException)
                {
                    // the server is not connected
                }
            }
        }
    }
}

namespace PublishConsoleAppNamespace
{
    public class MyMessage
    {
        public string Text { get; set; }
    }
}

Before running this code, here is the state of my RabbitMQ server:

ENQ-OverviewUpon running the code, here is what happens:

ENQ-PublishThe first thing to notice is that a message is indeed received on the server but that it is not queued. Hence, it is indication that when we publish a message, EasyNetQ does not declare a queue and this is confirmed by the fact that in “Global count – Queues ” = 0. However, a new exchange has indeed been created:

ENQ-Exchange created

The exchange is named according to the following pattern namespace_class:assembly and hence in our case, the name of our exchange is “PublishConsoleAppNamespace_MyMessage:PublishConsoleApp”. When I mentioned “namespace”, it is the namespace of the DTO “MyMessage” i.e. the exchange name is based upon the DTO class. The type of our exchange is “Topic”. Upon drilling into this exchange we’ll see that there are no queues bound to it:

ENQ-Exchange bindingsBefore closing the discussion about the Publisher code, I would like to draw your attention to this line:

var message = new MyMessage
{
    Text = "Hello Rabbit" + i
};

publishChannel.Publish(message);

Notice that we are passing message in as an argument to the Publish method and that the argument is of type “MyMessage”.

Were the code to be this:

var message = new MyMessage
{
    Text = "Hello Rabbit" 
};

publishChannel.Publish((message.Text + i));

Then the argument type you’re passing in actually a “String”. Hence, a new exchange will be created!

ENQ-StringPassedIn

So be careful and ascertain that you the right type is getting passed as an argument to the Publish method.

As the Publisher will not create the queue, it follows that in EasyNetQ, that this will happen in the Subscriber code.

Subscribing

So I then created a new console application:

using System;
using EasyNetQ;
using PublishConsoleApp;

namespace SubscribeConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("In subscriber");
            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
                    , msg => Console.WriteLine(msg.Text));
                Console.ReadKey();
            }
        }
    }
}

Upon running this code on its own i.e. don’t run the PublisherConsoleApp code at the same time, the following message will appear in the console:

In subscriber
DEBUG: Trying to connect
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'Ubuntu-12', Port: 5672, VHost: '/'
DEBUG: Declared Consumer. queue='PublishConsoleAppNamespace_MyMessage:PublishConsoleAppAssembly_SubscribeConsoleAppId', prefetchcount=50

This confirms that when we first ran the publisher, the messages just fell into a black hole. Moreover, as the last line and the following confirm, a queue has been created:

ENQ-Queue created NB: Had we run the Subscriber code before the Publisher code, both the queue and the exchange would have been created by EasyNetQ.

The name of the queue is again based upon the SubscribeConsoleApp code and the pattern is namespace_class:assembly_id. The naming requires further explanation and stems from this line of code:

bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
                    , msg => Console.WriteLine(msg.Text));

Effectively, EasyNetQ would appear to be looking at the generic type being passed in i.e. “MyMessage” as well as the id i.e. “SubscribteConsoleAppId” in order to build the name of the queue. Hence, the namespace and assembly are “PublishConsoleAppNamespace”, “PublishConsoleAppAssembly” rather than “SubscribeConsoleAppNamespace”, “SubscribeConsoleAppAssembly”. So, the full name of the queue is “PublishConsoleAppNamespace_MyMessage:PublishConsoleAppAssembly_SubscribeConsoleAppId”

Moreover, you can also see that the queue has been bound to the exchange created earlier. Conversely, the exchange shows that the newly created queue has been bound

ENQ-Exchange bound to queue

One other thing to note is that with EasyNetQ the queue is durable i.e. even after a server restart, the queue will still be there.

Now, when you run the PublishConsoleApp on its own, you should see this:

ENQ-Messages queuedIn other words, the published messages have now been put into the queue until such time that the SubscribeConsoleApp is run.

ENQ-Messages consumed

One of the problems I encounted happened when I changed the code to this:

Console.WriteLine("In subscriber");
using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
{
    bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
        , msg => Console.WriteLine(msg.Text));
}
Console.ReadKey();

In other words, I moved the code after the connection was closed and here is what was output to the console:

In subscriber
DEBUG: Trying to connect
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'Ubuntu-12', Port: 5672, VHost: '/'
DEBUG: Declared Consumer. queue='PublishConsoleAppNamespace_MyMessage:PublishCon
soleAppAssembly_SubscribeConsoleAppId', prefetchcount=50
DEBUG: Received
        RoutingKey: ''
        CorrelationId: '75328e0f-20b4-43c0-b597-701bfde3f50c'
        ConsumerTag: 'ec9d949f-f42c-4fd1-810e-2a17f25a2ffd'
DEBUG: Model Shutdown for queue: 'PublishConsoleAppNamespace_MyMessage:PublishCo
nsoleAppAssembly_SubscribeConsoleAppId'
DEBUG: Connection disposed
Hello Rabbit0
INFO: Basic ack failed because channel was closed with message The AMQP operatio
n was interrupted: AMQP close-reason, initiated by Application, code=200, text="
Goodbye", classId=0, methodId=0, cause=. Message remains on RabbitMQ and will be
 retried.

As I was unsure about what was happening I posted this issue on the forums and this is the answer provided by Mike Hadlow, one of the authors of EasyNetQ:

When you dispose the IBus instance, the connection to the RabbitMQ broker is closed. 
The subscription handler (msg => Console.WriteLine(msg.Text)) runs on a separate thread 
when a message is received, so your code is setting up a subscription and then immediately 
closing the connection. I'm surprised that you are even seeing the WriteLine running. By 
the time it comes to send the ack the connection is closed and you get the error that you're seeing.

A basic application scenario is to open a connection (RabbitHutch.CreateBus) and keep it open for the 
lifetime of the app. When the app exits, you dispose the IBus instance which closes the connection.

So that poses the question of “How do I write the subscriber for a console application and closing the connection when the application is shut down?”. At this point, I am not quite sure but once I do, I’ll blog about it.

EDIT

Here is a site which I’ve found and which might solve the issue of running code at console application shut down. I have not tried it but it might be worth a try.

Posted in .NET, ALT.NET, EasyNetQ, RabbitMQ

Experimenting with RabbitMQ – Topic exchange


Preamble

This is part 4 of a series of blogposts about RabbitMQ. This series aims to provide more information (I cannot vouch for the accuracy of the information as I’m a beginner at RabbitMQ) concerning a series of posts by Derek Greer.

Part 1: Experimenting with RabbitMQ – HelloWorldExample

Part 2: Experimenting with RabbitMQ – LoggingApplication example

Part 3: Experimenting with RabbitMQ – Fanout exchange

Part 4: Experimenting with RabbitMQ – Topic exchange

————————

The code provided here demonstrates a topic exchange and running it works fine (you can get the code here). However, then I tried to filter all the “Business” messages to another consumer and here what I tried first:

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer2
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("Should show only Business messages");
            Console.WriteLine("==================================");
            const string exchange = "topic-exchange-example";
            const string queue = "log";

            var connectionFactory = new ConnectionFactory();
            var connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();

            channel.ExchangeDeclare(exchange, ExchangeType.Topic, false, true, null);
            channel.QueueDeclare(queue, false, false, true, null);
            channel.QueueBind(queue, exchange, "*.Business.*");

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);

            while (true)
            {
                try
                {
                    var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(eventArgs.Body);
                    Console.WriteLine(string.Format("{0} - {1}", eventArgs.RoutingKey, message));
                }
                catch (EndOfStreamException)
                {
                    // The consumer was cancelled, the model closed, or the connection went away.
                    break;
                }
            }
            channel.Close();
            connection.Close();
        }
    }
}

This is nearly identical to the Consumer application provided in the original post. When I ran this I got:

Topic

This was definitely not what I was expecting because I would have thought that by specifying

channel.QueueBind(queue, exchange, "*.Business.*");

would have been enough to do the filtering.

Upon further thought, what this is, I believe, saying is bind all the messages that match the routing key “*.Business.*” to the queue “log”. Now, if you consider the same code in “Consumer” itself:

channel.QueueBind(queue, exchange, "*.Personal.*");

So effectively we have bound all messages matching “*.Business.*” and “*.Personal.*” to the same queue “log” and we dequeue it, we’ll get a mixture of both. Hence, the solution to that problem is to create two queues “log-personal” and “log-business” which results in:

Topic correctThis in fact corresponds to this diagram:

Posted in .NET, ALT.NET, RabbitMQ

Experimenting with RabbitMQ – Fanout exchange


Preamble

This is part 3 of a series of blogposts about RabbitMQ. This series aims to provide more information (I cannot vouch for the accuracy of the information as I’m a beginner at RabbitMQ) concerning a series of posts by Derek Greer.

Part 1: Experimenting with RabbitMQ – HelloWorldExample

Part 2: Experimenting with RabbitMQ – LoggingApplication example

Part 3: Experimenting with RabbitMQ – Fanout exchange

Part 4: Experimenting with RabbitMQ – Topic exchange

————————

Here are some experiments I carried out based upon this post and here is my version which you can obtain here:

Producer

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Xml;
using System.Xml.Linq;
using RabbitMQ.Client;

namespace Producer
{
    class Program
    {
        private static volatile bool _cancelling;
        private const string Exchange = "fanout-exchange-example";
        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();
            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);

            var thread = new Thread(() => PublishQuotes(channel));
            thread.Start();

            Console.WriteLine("Press any key to exit");
            Console.ReadKey();

            _cancelling = true;

            channel.Close();
            connection.Close();
        }

        private static void PublishQuotes(IModel channel)
        {
            while (true)
            {
                if (_cancelling)
                {
                    return;
                }
                IEnumerable<string> quotes = FetchStockQuotes(new[] { "GOOG", "HD", "MCD" });

                foreach (var quote in quotes)
                {
                    byte[] message = Encoding.UTF8.GetBytes(quote);
                    channel.BasicPublish(Exchange, "", null, message);
                }
                Thread.Sleep(5000);
            }
        }

        private static IEnumerable<string> FetchStockQuotes(string[] symbols)
        {
            var quotes = new List<string>();

            string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys",
                String.Join("%2C", symbols.Select(s => "%22" + s + "%22")));
            var wc = new WebClient { Proxy = WebRequest.DefaultWebProxy };
            var ms = new MemoryStream(wc.DownloadData(url));
            var reader = new XmlTextReader(ms);
            XDocument doc = XDocument.Load(reader);
            XElement results = doc.Root.Element("results");

            foreach (string symbol in symbols)
            {
                XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol);
                quotes.Add(symbol + ":" + q.Element("AskRealtime").Value);
            }

            return quotes;
        }
    }
}

Consumer

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
    class Program
    {
        private const string Exchange = "fanout-exchange-example";
        private const string Queue = "quotes";

        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();

            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
            channel.QueueDeclare(Queue, false, false, true, null);
            channel.QueueBind(Queue, Exchange, "anything");

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(Queue, true, consumer);

            Console.WriteLine("In Consumer");
            Console.WriteLine("===========");
            while (true)
            {
                try
                {
                    var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(eventArgs.Body);
                    Console.WriteLine(message);
                    Console.WriteLine("--------");
                }
                catch (EndOfStreamException)
                {
                    // The consumer was cancelled, the model closed, or the connection went away.
                    break;
                }
            }

            channel.Close();
            connection.Close();

        }
    }
}

This code when run worked fine. Then I referred back to this diagram:

and decided to add another consumer

Consumer2

using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer2
{
    class Program
    {
        private const string Exchange = "fanout-exchange-example";
        private const string Queue = "quotes2";

        static void Main()
        {
            var connectionFactory = new ConnectionFactory();
            IConnection connection = connectionFactory.CreateConnection();
            IModel channel = connection.CreateModel();

            channel.ExchangeDeclare(Exchange, ExchangeType.Fanout);
            channel.QueueDeclare(Queue, false, false, true, null);
            channel.QueueBind(Queue, Exchange, "anything");

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(Queue, true, consumer);

            Console.WriteLine("In Consumer2");
            Console.WriteLine("===========");
            while (true)
            {
                try
                {
                    var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var message = Encoding.UTF8.GetString(eventArgs.Body);
                    Console.WriteLine(message);
                    Console.WriteLine("--------");
                }
                catch (EndOfStreamException)
                {
                    // The consumer was cancelled, the model closed, or the connection went away.
                    break;
                }
            }

            channel.Close();
            connection.Close();

        }
    }
}

Upon running the application both consumers would receive the same information which is what we expected:

Fanout

Redeclaring the exchange to be direct

First I delete the “fanout-exchange-example” exchange using the web interface. Then in “Producer”, “Consumer” and “Consumer2”, I changed the exchange declaration to:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);

Upon running the application, I got the same results as before. I was initially surprised by this but upon looking at the code, the architecture and the model of RabbitMQ started making more sense. So, let’s break it down by first starting with the Producer and in particular:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);
channel.BasicPublish(Exchange, "", null, message);

In the Producer, we have only declared the Exchange and its type. We have not bound it to any queue whatsoever. However, notice that we have specified the “routing key” to be the empty string. Bear that in mind as this will make more sense later.

Now in Consumer (the same applies for Consumer2 here), consider the following code:

channel.ExchangeDeclare(Exchange, ExchangeType.Direct);
channel.QueueDeclare(Queue, false, false, true, null);
channel.QueueBind(Queue, Exchange, "");

First we’re declaring an exchange which is the same as in the Producer. Then we also declare a queue where we will push the message. Finally, and very importantly, we bind the Queue to the Exchange and here is the signature of the “QueueBind” method:

QueueBind(string queue, string exchange, string routingKey)

Now according to the RabbitMQ documentation:

A direct exchange delivers messages to queues based on the message routing key. A direct exchange is ideal for the unicast routing of messages 
(although they can be used for multicast routing as well). Here is how it works:
  • A queue binds to the exchange with a routing key K
  • When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R

So when we bound the Queue to the Exchange using the empty string routing key (remember that I told you to bear this in mind earlier), the message will get delivered to both Consumer and Consumer2.

To prove that this was indeed the case, what I did was to change the routing key for “Consumer2” by using this:

channel.QueueBind(Queue, Exchange, "anything");

and this was the result:

RoutingKey

Hence, this demonstrates one of the differences between a fanoutexchange and a direct exchange. Had the exchange been a fanout exchange, the routing key wouldn’t have mattered one bit and both Consumer and Consumer2 would have received the same messages.

Declaring both consumers to have the same queue

If you have been following along, then you’ll want to revert the exchange type to being fanout and deleting the existing exchange on the server.

So what I did next was to ensure that both Consumer and Consumer2 were using the “quotes” queue by setting the following in Consumer2

private const string Queue = "quotes";

The result was quite interesting

Same queue

As you can see, each consumer would process the message indeterminately i.e. what message will be processed on which server is not controlled. What can be guaranteed is that the messages will be processed. I guess that this could be used to distribute the processing randomly and possibly even out the load across servers.

Posted in .NET, ALT.NET, RabbitMQ

Experimenting with RabbitMQ – LoggingApplication example


Preamble

This is part 2 of a series of blogposts about RabbitMQ. This series aims to provide more information (I cannot vouch for the accuracy of the information as I’m a beginner at RabbitMQ) concerning a series of posts by Derek Greer.

Part 1: Experimenting with RabbitMQ – HelloWorldExample

Part 2: Experimenting with RabbitMQ – LoggingApplication example

Part 3: Experimenting with RabbitMQ – Fanout exchange

Part 4: Experimenting with RabbitMQ – Topic exchange

————————

These are some notes based upon this blogpost.

In this example, the Producer and Consumer applications differ from the HelloWorldExample in the following ways.

Logging Application’s Producer

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using RabbitMQ.Client;

namespace Producer
{
  class Program
  {
    static void Main(string[] args)
    {
      Thread.Sleep(1000);
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);
      string value = DoSomethingInteresting();
      string logMessage = string.Format("{0}: {1}", TraceEventType.Information, value);

      byte[] message = Encoding.UTF8.GetBytes(logMessage);
      channel.BasicPublish("direct-exchange-example", string.Empty, null, message);

      channel.Close();
      connection.Close();
    }

    static string DoSomethingInteresting()
    {
      return Guid.NewGuid().ToString();
    }
  }
}

In this particular instance, the producer explicitly declares a “Direct Exchange” but there is no queue declared. Instead, when we publish the message, we send it to the “direct-exchange-example” exchange and pass the empty string as the routing key.

The author also emphasises the following:

Note that our logging example’s Producer differs from our Hello World’s Producer in that we didn’t declare a queue this time.  
In our Hello World example, we needed to run our Producer before the Consumer since the Consumer simply retrieved a single message and exited.  
Had we published to the default exchange without declaring the queue first, our message would simply have been discarded by the server before 
the Consumer had an opportunity to declare and bind the queue.

So I decided to go back and experiment and declared the empty string as being the queue and the routing key.

HelloWorld’s Producer

static void Main()
{
    const string queue = "";//"hello-world-queue";
    var connectionFactory = new ConnectionFactory();
    IConnection connection = connectionFactory.CreateConnection();

    IModel channel = connection.CreateModel();

    channel.QueueDeclare(queue, false, false, false, null);

    byte[] message = Encoding.UTF8.GetBytes("Hello, World!");
    channel.BasicPublish(string.Empty, queue, null, message);

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    channel.Close();
    connection.Close();
}

When I ran HelloWorld’s Producer, three things of note happened:

  1. A message was indeed sent to the server
  2. As Derek Greer pointed out, the server discarded the message i.e. it did not get queued.
  3. Actually a queue was created. It was named amq.gen-wk-vqJ4WsDriToH7XOs0sg  with the part in red being randomly generated. Hence running, this code again created another queue.

EmptyStringForQueue

Logging Application’s Consumer

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Consumer
{
  class Program
  {
    static void Main(string[] args)
    {
      var connectionFactory = new ConnectionFactory();
      IConnection connection = connectionFactory.CreateConnection();
      IModel channel = connection.CreateModel();

      channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Direct);
      channel.QueueDeclare("logs", false, false, true, null);
      channel.QueueBind("logs", "direct-exchange-example", "");

      var consumer = new QueueingBasicConsumer(channel);
      channel.BasicConsume("logs", true, consumer);

      var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();

      string message = Encoding.UTF8.GetString(eventArgs.Body);
      Console.WriteLine(message);

      channel.Close();
      connection.Close();
      Console.ReadLine();
    }
  }
}

In the consumer code, there are various aspects which are interesting. First, we are creating a queue which we are calling “logs” but the point to take from this is that the “auto-delete” has been set to true (We’ll return to this later).

The second point to note is that we are binding the queue to the exchange:

channel.QueueBind("logs", "direct-exchange-example", "");

We are retrieving the message differently to HelloWorld’s Consumer:

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("logs", true, consumer);
var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();

The fact that we are using a BasicConsume instead of the BasicGet will cause the queue to be deleted automatically (remember auto-delete mentioned above) as soon as the application is terminated.

Also, just to complete what Derek Greer mentioned in his post, the Dequeue method is a blocking call. What that means is if we were to run the Consumer application first, execution would be blocked on

var eventArgs = (BasicDeliverEventArgs) consumer.Queue.Dequeue();

until we ran the Producer application which published a message.

One thing that still puzzles me though is this. In the Producer code, when we published the message we didn’t assign a particular routing key.

channel.BasicPublish("direct-exchange-example", string.Empty, null, message);

Then when we retrieved the message, it somehow got put on the “logs” queue.

EDIT: Answer to the last puzzle.

This was in fact addressed in the original post and I quote:

To associate our logs queue with our exchange, we use the QueueBind() method providing the name of the queue, the name of the exchange, and the binding key to filter messages on:
channel.QueueBind("logs", "direct-exchange-example", "");
Posted in .NET, ALT.NET, RabbitMQ

Experimenting with RabbitMQ – HelloWorldExample


Preamble

This is part 1 of a series of blogposts about RabbitMQ. This series aims to provide more information (I cannot vouch for the accuracy of the information as I’m a beginner at RabbitMQ) concerning a series of posts by Derek Greer.

Part 1: Experimenting with RabbitMQ – HelloWorldExample

Part 2: Experimenting with RabbitMQ – LoggingApplication example

Part 3: Experimenting with RabbitMQ – Fanout exchange

Part 4: Experimenting with RabbitMQ – Topic exchange

————————

 

I am currently following the excellent series on RabbitMQ by Derek Greer. The latter give a very good introduction of the basic concepts of RabbitMQ. However, some of the implementation details have been glossed over and I aim to document some of these minor points as I understand them.

From his explanation of the “HelloWorldExample” he has got the following code:

Producer

class Program
{
  static void Main(string[] args)
  {
    var connectionFactory = new ConnectionFactory();
    IConnection connection = connectionFactory.CreateConnection();
    IModel channel = connection.CreateModel();
    channel.QueueDeclare("hello-world-queue", false, false, false, null);
    byte[] message = Encoding.UTF8.GetBytes("Hello, World!");
    channel.BasicPublish(string.Empty, "hello-world-queue", null, message);
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    channel.Close();
    connection.Close();
  }
}

Consumer

class Program
{
  static void Main(string[] args)
  {
    var connectionFactory = new ConnectionFactory();
    IConnection connection = connectionFactory.CreateConnection();
    IModel channel = connection.CreateModel();
    channel.QueueDeclare("hello-world-queue", false, false, false, null);
    BasicGetResult result = channel.BasicGet("hello-world-queue", true);
    if (result != null)
    {
      string message = Encoding.UTF8.GetString(result.Body);
      Console.WriteLine(message);
    }
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    channel.Close();
    connection.Close();
  }
}

This code works and runs fine but what caught my attention was the fact that both the Producer and Consumer create the queue:

channel.QueueDeclare("hello-world-queue", false, false, false, null);

Hence, the first conclusion that can be drawn is the fact that redeclaring the same queue does not create a new queue on the server.

Read more ›

Posted in .NET, ALT.NET, RabbitMQ