Getting to grips with EasyNetQ – Part 1- PubSub


Following on from the series I had on RabbitMQ, I then moved onto EasyNetQ. Now, if you have no idea what RabbitMQ is, then this post is unlikely to be appropriate. I would suggest to first go through the blog posts by Derek Greer and then maybe go through the related blog posts. Having a basic understanding of AMQP (like I do) would also be very beneficial.

Now this post is not really the “Hello World” of how to use EasyNetQ. The GitHub wiki and this video tutorial amply demonstrate how to get going. What I want to achieve in this post is to hopefully give a bit more detail about what and how EasyNetQ is doing under the covers.

So, what I did first was to fork the project on Gitbub so that I could then debug code execution. Then I created my own project which would then reference the forked version of EasyNetQ. Hence, unless you also do the same, you won’t be able to use the code as is.

Publishing

Let’s start with some code in order to publish a message onto the message broker.

using System.Threading;
using EasyNetQ;

namespace PublishConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                Thread.Sleep(1000);                
                try
                {
                    using (var publishChannel = bus.OpenPublishChannel())
                    {
                        for (var i = 0; i < 10000; i++)
                        {
                            var message = new MyMessage
                            {
                                Text = "Hello Rabbit" + i
                            };
                          
                            publishChannel.Publish(message);
                        }
                    }
                }
                catch (EasyNetQException)
                {
                    // the server is not connected
                }
            }
        }
    }
}

namespace PublishConsoleAppNamespace
{
    public class MyMessage
    {
        public string Text { get; set; }
    }
}

Before running this code, here is the state of my RabbitMQ server:

ENQ-OverviewUpon running the code, here is what happens:

ENQ-PublishThe first thing to notice is that a message is indeed received on the server but that it is not queued. Hence, it is indication that when we publish a message, EasyNetQ does not declare a queue and this is confirmed by the fact that in “Global count – Queues ” = 0. However, a new exchange has indeed been created:

ENQ-Exchange created

The exchange is named according to the following pattern namespace_class:assembly and hence in our case, the name of our exchange is “PublishConsoleAppNamespace_MyMessage:PublishConsoleApp”. When I mentioned “namespace”, it is the namespace of the DTO “MyMessage” i.e. the exchange name is based upon the DTO class. The type of our exchange is “Topic”. Upon drilling into this exchange we’ll see that there are no queues bound to it:

ENQ-Exchange bindingsBefore closing the discussion about the Publisher code, I would like to draw your attention to this line:

var message = new MyMessage
{
    Text = "Hello Rabbit" + i
};

publishChannel.Publish(message);

Notice that we are passing message in as an argument to the Publish method and that the argument is of type “MyMessage”.

Were the code to be this:

var message = new MyMessage
{
    Text = "Hello Rabbit" 
};

publishChannel.Publish((message.Text + i));

Then the argument type you’re passing in actually a “String”. Hence, a new exchange will be created!

ENQ-StringPassedIn

So be careful and ascertain that you the right type is getting passed as an argument to the Publish method.

As the Publisher will not create the queue, it follows that in EasyNetQ, that this will happen in the Subscriber code.

Subscribing

So I then created a new console application:

using System;
using EasyNetQ;
using PublishConsoleApp;

namespace SubscribeConsoleAppNamespace
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("In subscriber");
            using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
            {
                bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
                    , msg => Console.WriteLine(msg.Text));
                Console.ReadKey();
            }
        }
    }
}

Upon running this code on its own i.e. don’t run the PublisherConsoleApp code at the same time, the following message will appear in the console:

In subscriber
DEBUG: Trying to connect
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'Ubuntu-12', Port: 5672, VHost: '/'
DEBUG: Declared Consumer. queue='PublishConsoleAppNamespace_MyMessage:PublishConsoleAppAssembly_SubscribeConsoleAppId', prefetchcount=50

This confirms that when we first ran the publisher, the messages just fell into a black hole. Moreover, as the last line and the following confirm, a queue has been created:

ENQ-Queue created NB: Had we run the Subscriber code before the Publisher code, both the queue and the exchange would have been created by EasyNetQ.

The name of the queue is again based upon the SubscribeConsoleApp code and the pattern is namespace_class:assembly_id. The naming requires further explanation and stems from this line of code:

bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
                    , msg => Console.WriteLine(msg.Text));

Effectively, EasyNetQ would appear to be looking at the generic type being passed in i.e. “MyMessage” as well as the id i.e. “SubscribteConsoleAppId” in order to build the name of the queue. Hence, the namespace and assembly are “PublishConsoleAppNamespace”, “PublishConsoleAppAssembly” rather than “SubscribeConsoleAppNamespace”, “SubscribeConsoleAppAssembly”. So, the full name of the queue is “PublishConsoleAppNamespace_MyMessage:PublishConsoleAppAssembly_SubscribeConsoleAppId”

Moreover, you can also see that the queue has been bound to the exchange created earlier. Conversely, the exchange shows that the newly created queue has been bound

ENQ-Exchange bound to queue

One other thing to note is that with EasyNetQ the queue is durable i.e. even after a server restart, the queue will still be there.

Now, when you run the PublishConsoleApp on its own, you should see this:

ENQ-Messages queuedIn other words, the published messages have now been put into the queue until such time that the SubscribeConsoleApp is run.

ENQ-Messages consumed

One of the problems I encounted happened when I changed the code to this:

Console.WriteLine("In subscriber");
using (var bus = RabbitHutch.CreateBus("host=Ubuntu-12"))
{
    bus.Subscribe<MyMessage>("SubscribeConsoleAppId"
        , msg => Console.WriteLine(msg.Text));
}
Console.ReadKey();

In other words, I moved the code after the connection was closed and here is what was output to the console:

In subscriber
DEBUG: Trying to connect
DEBUG: OnConnected event fired
INFO: Connected to RabbitMQ. Broker: 'Ubuntu-12', Port: 5672, VHost: '/'
DEBUG: Declared Consumer. queue='PublishConsoleAppNamespace_MyMessage:PublishCon
soleAppAssembly_SubscribeConsoleAppId', prefetchcount=50
DEBUG: Received
        RoutingKey: ''
        CorrelationId: '75328e0f-20b4-43c0-b597-701bfde3f50c'
        ConsumerTag: 'ec9d949f-f42c-4fd1-810e-2a17f25a2ffd'
DEBUG: Model Shutdown for queue: 'PublishConsoleAppNamespace_MyMessage:PublishCo
nsoleAppAssembly_SubscribeConsoleAppId'
DEBUG: Connection disposed
Hello Rabbit0
INFO: Basic ack failed because channel was closed with message The AMQP operatio
n was interrupted: AMQP close-reason, initiated by Application, code=200, text="
Goodbye", classId=0, methodId=0, cause=. Message remains on RabbitMQ and will be
 retried.

As I was unsure about what was happening I posted this issue on the forums and this is the answer provided by Mike Hadlow, one of the authors of EasyNetQ:

When you dispose the IBus instance, the connection to the RabbitMQ broker is closed. 
The subscription handler (msg => Console.WriteLine(msg.Text)) runs on a separate thread 
when a message is received, so your code is setting up a subscription and then immediately 
closing the connection. I'm surprised that you are even seeing the WriteLine running. By 
the time it comes to send the ack the connection is closed and you get the error that you're seeing.

A basic application scenario is to open a connection (RabbitHutch.CreateBus) and keep it open for the 
lifetime of the app. When the app exits, you dispose the IBus instance which closes the connection.

So that poses the question of “How do I write the subscriber for a console application and closing the connection when the application is shut down?”. At this point, I am not quite sure but once I do, I’ll blog about it.

EDIT

Here is a site which I’ve found and which might solve the issue of running code at console application shut down. I have not tried it but it might be worth a try.

Advertisements
Posted in .NET, ALT.NET, EasyNetQ, RabbitMQ

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: