Publish Subscribe in C# using ActiveMQ


According to JMS Specification “The Java Message Service (JMS) sets the standard or rules for reliable Enterprise Messaging or Messaging Oriented Middleware (MOM).”

Messaging or asynchronous messaging is a part of any Enterprise architecture now a days. A message can loosely be explained as any data interchange between different programs inside the same computer and different processes or between different computers and processes running in different computers.

This article focuses on live publishing and distributing data between different processes or computers as and when the data arrives. This is also called as Pub-Sub model or Publish Subscribe model. We are going to use C# as our programming language. Unfortunately none of the Microsoft frameworks currently offer this feature.

The much touted MSMQ which offers asynchronous messaging could not offer a Push as far as I have tried. Otherwise I also use MSMQ for various purposes and its performance is undoubtedly good. Ultimately we do not have a direct pub-sub solution with MSMQ. If somebody finds me wrong in this regard, pls feel free to post your comments.

Basically there would be a process in the same computer or some other computer in the network which will post (or publish) the messages to one place and there will be another listener which will be notified with the new message whenever it arrives. This listener should get the message automatically and there is no request every minute or every hour etc., We can also call this a “push” model. This push model of data can be used in different places like Stock Prices, Cricket Scores and a lot more.

Now let us see how to use a JMS framework and get our Push model working..

Basic Setup:

The primary ingredient of this whole exercise is ActiveMQ which can be downloaded from the linkhttp://activemq.apache.org/activemq-541-release.html. We are using the current stable release. Please make sure you install the basic JDK requirements etc., as required by the ActiveMQ product. The version included in the above link needs Java 1.6 installation.

The Active MQ is a very simple to understand product. This is a product from Apache family of Opensource frameworks and in our experience we found to be a very stable product. All we need to do is

  • Download the program to a folder in your local machine
  • And execute the bin\activemq.bat.

We also have the option of installing this as an Windows Service in which case this will start itself automatically everytime when the windows starts ActiveMQ has some components namely Queues and Topics which are the destinations to where a publisher can send messages and a subscriber can receive messages.

ActiveMQ also has a .Net API named as NMS which can be downloaded from http://activemq.apache.org/nms/. Our program is going to use this library as well as we don’t want to re-invent the entire wheel of building everything again.

NMS has support for different providers of ActiveMQ (which is an entirely separate discussion) and we are using OpenWire protocol for our purposes.

Components of an ActiveMQ Pub Sub Program:

Components of a publisher subscriber in an ActiveMQ program are the following.

  • Active MQ - Acts as the Central hub to receive and push the data
  • Topic - This is the virtual path or location where the messages will be published.
  • Publisher - The program which wants to push the data to subscribers. Can be one or many
  • Subscriber - The program or programs which want to subscribe to the published data.

Apart from the above, the ActiveMQ also has a

  • Connection - Handle to the mutual conversation between the client and activemq server
  • Session - Maintains the session
  • IMessageConsumer - A class for subscription to the activemq broker server
  • IMessageProducer - Class for publishing messages to the broker (to a topic or a queue)

Active MQ has a durble and non durable subscriptions and publishing. Durable subscription is suggested for highly reliable publish subscribe where not even a single percent of loss is permissible. But the flipside of it is that this requires large amount of memory which keeps increasing hour by hour. This is suggested in areas where we need to make sure the data is never lost for cases.

Non durable publish subscribe something that is required for cases where you cannot spare too much of memory and performance but can live with some amount of data losses, for example you miss one cricket score update every 3-4 hours etc.,

Structure of the Program:

What I have tried to do is to wrap the publisher and subscriber in separate classes named as NonDurableTopicPublisher and NondurableTopicSubscriber. The library also declares a delegate which is invoked on every message reception. If you want more details about delegates visit delegates and events article.

All we need to do is to download and run the activemq from activemqinstallationfolder\ bin\activemq.bat. This will start the activemq process. Please make sure you have the necessary JDK versions installed otherwise this won’t run smoothly. If this was installed as a windows service this would have started automatically (if this was configured for auto start mode, this would have already started)

The above classes have been written as a .Net Class Library and a small console application is also created to use these classes. If you run the program it will give an output of Test Message 1 .. Test Message 99 which are published by the publisher and subscribed in the same function itself.

using ActiveMqUtils;
namespace ActiveMq
{
class Program
{
static void Main(string[] args)
{
String topicname = "mysampletopic";
String activemqbrokerurl = "tcp://localhost:61616";
String strclientid = "clientidsample1";
//Subscribe first to a topic
NonDurableTopicSubscriber mysubscriber = new NonDurableTopicSubscriber(topicname, activemqbrokerurl, strclientid);
mysubscriber.OnMessageReceived += new MessageReceivedDelegate(subscriber_OnFenceMessageReceived);
//Publish to the subscribed topic
NonDurableTopicPublisher mypublisher = new NonDurableTopicPublisher(topicname, activemqbrokerurl);
for(int i=0; i < 100; i++)
{
mypublisher.SendMessage(String.Format("Test message {0}",i));
}
}
static void subscriber_OnFenceMessageReceived(string message)
{
Console.WriteLine(message);
}
}
}

In the library added, the NonDurableTopicPublisher connects to the broker uri provided and keeps on posting the messages to the topic mentioned. Note that this has the capability to publish to any server, be it the same machine or a different machine on the network.

The only catch is that we need to be careful about the firewall intrusions from hackers. The safest would be to open the port 61616 to only those ips that you trust.

NonDurableTopicSubscriber connects to the broker and waits for any updates from the ActiveMQ. The moment there is a message received, it immediately gets notified about it using the delegate called as MessageReceivedDelegate.

Summary:

Basically this solves the problem of instantaneous notifications required by different critical programs required in a lot of our day to day enterprise systems. One caveat here is that the performance of such applications need to be closely monitored and measured to obtain the wholesome benefits of ActiveMq. In my case I am able to churn out more than 1000 messages a minute and my load keeps growing.

Refer to the attached sample program for yourself. The sample is built using VS 2010.