2012/08/20
20 Aug, 2012

Getting Started Message Broker with C#

  • Sajini De Silva
  • Associate Technical Lead - WSO2
Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.

Introduction

WSO2 Message Broker supports JMS, WS-Eventing, Amazon SQS and AMQP. By using AMQP we can create a .Net program to send a message to a queue in MB and retrieve that message from a .Net client or a Java client. In the rest of this tutorial I'm going to explain how to implement a .Net client to send messages to a queue and retrieve those messages using a .Net client and a Java client. When you run these programs, first run the Consumer program and then the Producer program.

Applies to

WSO2 MB 1.0.2
AMQP 1-10

Table of Contents

  • Exchanges Queues and Bindings in AMQP.
  • Send a message using a .Net client.
  • Retrieve a message using a .Net client.
  • Retrieve a message using a Java client.

Exchanges Queues and bindings in AMQP

Like any other messaging protocol AMQP also uses producers and consumers when sending and receiving messages. The responsibility of the producer is to produce the message and send it to the destination. The consumer consumes that message and process it. It is the responsibility of the message broker to deliver the message which is produced by the producer, to the consumer.

AMQP uses two components in order to complete this task. They are exchanges and queues. Producer produces messages and delivers to the exchange. Consumer consumes messages from the queue. We use bindings to connect these two components each other. Publisher and the consumer are recognized each other via the exchange name. Usually either the publisher or the consumer create the exchange and make the name of the exchange public. The Queue should be attached to the exchange after declaring the queue. Then broker has to match the messages received by the exchange to the queue. That is done using bindings. Typically queue binding is happening at the client's side. The following diagram explains what I described above.

Send a message using a .net client.

In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project. You can download that dll file from this website https://www.rabbitmq.com/dotnet.html or here

.Net Publisher:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;---------------------------------------------------------------[1]

namespace RabbitMQ
{
    class Publisher
    {
        static void Main(string[] args)
        {
            Publisher p=new Publisher();
           
                p.PublishMessage("This is a Test "+i);
                Console.WriteLine("Sent Message "+i);
          
            
            Console.ReadLine();
        }



        public void PublishMessage(string message)
        {


	//Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();-----------------------------------------[2]
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;




            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {

	       //Declare the exchange for the publisher.Here the exchange type is direct.
                    ch.ExchangeDeclare("amq.direct", "direct");--------------------------------------[3]
                 //Publish the message
                   ch.BasicPublish("amq.direct", "test-queue", null, Encoding.UTF8.GetBytes(message));--[4]
                    
                  
                }
            }
        }
    }

[1] Import the RabbitMQ.Client.dll file to the program.

[2] Create a connection to the MB. Here we specify the VirtualHost as “carbon”. If it is not specified it will go the default VirtualHost, “test”.

[3] Declare the exchange type for the publisher. The queue is interested in messages from this exchange. Here the exchange type is direct. That means a message will go to the queues only if the binding key exactly matches the routing key of the message.

[4] Publish the message. Here we need to specify the exchange type and the routing key. In this example the routing key is similar to the queue name which is “test-queue”. The next parameter is IbasicProperties. Keep it as null for this example. Last parameter is the message body.

Retrieve the message using a .net client

QueueConsumer:

This consumer program fetches messages from the queue and display them in the console. In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project as mentioned above.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;-----------------------------------------------------------------------------[5]


namespace QueueConsumer
{
    class QueueConsumer
    {
        static void Main(string[] args)
        {
            QueueConsumer qConsumer = new QueueConsumer();
            qConsumer.getMessage();
        }

        public void getMessage()
        {

	//Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();--------------------------------[6]
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;


            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                    //Declare a queue to retrieve messages.
                    ch.QueueDeclare("test-queue", true, false, false, null);--------------------------------[7]
	       //Create the binding between queue and the exchance
                    ch.QueueBind("test-queue", "amq.direct", "test-queue");------------------------------[8]
                   QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);----------[9]
                    ch.BasicConsume("test-queue",false,consumer);-------------------------[10]
                   
                  
                    while (true)------------------------------------------[11]
                    {
                        try
                        {
                            RabbitMQ.Client.Events.BasicDeliverEventArgs e =
                            (RabbitMQ.Client.Events.BasicDeliverEventArgs)
                            consumer.Queue.Dequeue();
                            byte[] body = e.Body;
                            string message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(message);
                            ch.BasicAck(e.DeliveryTag, false);
                        }
                        catch (OperationCanceledException e)
                        {
                            Console.WriteLine(e);
                            break;
                        }
                    }
                  

                   
                }
            }
        }


       
    }
}

[5] First import the dll file as mentioned above.

[6] Create a connection with the message broker.

[7] Declare a queue to receive messages. The first parameter is the queue name. The second parameter is durable. Let's make it true. The newt two parameters are exclusive and auto-delete. Keep it as false. The last parameter is IDictionary arguments. Keep it as null.

[8] Here we create the binding for the queue and the exchange. Simply a binding is a relationship between an exchange and a queue. The first parameter is the queue that we want to bind. The next parameter is the exchange that we are binding with the queue and the last parameter is the routing key. Here the routing key is similar to the queue name.

[9] Create a consumer to consume the messages from the queue.

[10] Get the message from the queue using the consumer created in the previous step. The first parameter is the name of the queue. Put it as “test-queue”. The next parameter is noAck. Let's keep it as false. For the next parameter pass the consumer object which we created in the previous step.

[11] In this infinite while loop, we listen for messages, decoding each one, and printing its body in the console.

Retrieve the message using a Java client

ConsumeClient:

This consumer class fetches messages from the queue and display them in the console.

Here we use a Qpid JNDI provider to obtain the access to the JMS connection, queues etc. By using this JNDI provider the overall system may become portable and simple.

import javax.naming.InitialContext;
import javax.jms.*;
import java.util.Properties;

public class ConsumeClient {


    public void consumeMessage() throws Exception {


        Properties initialContextProperties = new Properties();---------------------------------------[12]
        initialContextProperties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://127.0.0.1:5672'";
        initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
        initialContextProperties.put("queue.test-queue", "test-queue");


        InitialContext initialContext = new InitialContext(initialContextProperties);
        ConnectionFactory queueConnectionFactory
                = (ConnectionFactory) initialContext.lookup("qpidConnectionfactory");

        Connection queueConnection = queueConnectionFactory.createConnection();--------------------[13]
        queueConnection.start();

        Session queueSession = queueConnection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);----------------------------[14]
        Destination destination = (Destination) initialContext.lookup("test-queue"); ---------------------[15]

        MessageConsumer messageConsumer = queueSession.createConsumer(destination);-----------[16]


          while(true){

              Message msg=messageConsumer.receiveNoWait();-----------------------------------------------[17]
              if(msg==null){
                  System.out.println("no messages to print");https://aws.amazon.com/sqs/
                 break;
              }
        System.out.println(msg);
          }


       messageConsumer.close();
        queueSession.close();
       queueConnection.stop();
        queueConnection.close();

    }

    public static void main(String[] args) throws Exception {
        ConsumeClient sendConsumeClient = new ConsumeClient();
        sendConsumeClient.consumeMessage();
    }


}

[12] Create a properties object to store properties and setup the connection with the MB.

[13] Create a connection using the connection factory.

[14] Create a JMS session using the connection.

[15] lookup from JNDI to lookup our queue.

[16] Create a consumer to get the message.

[17] Receive the message from the queue. Here the consumer does not wait for the next message.

Conclusion

Since WSO2 Message Broker supports AMQP protocol it is possible to access the broker using C# API. It is not necessary to use both publisher and the Consumer ,the same API. It possible to publish a message using a .Net client and retrieve that message using a Java client and vice versa.

References

[1]https://www.rabbitmq.com/dotnet.html

Author

Sajini De Silva, Trainee Software Engineer, WSO2 Inc

 

About Author

  • Sajini De Silva
  • Associate Technical Lead
  • WSO2