2011/09/09
9 Sep, 2011

Messaging and Eventing in SOA

  • Amila Suriarachchi
  • Architect, Member, Management Committee - Data Technologies - WSO2

Introduction

Messaging is a key part in distributed computing where communication should happen asynchronously. Typically, store and forward messaging is used with intermediate brokers to store and forward the messages. An event represents a state change in a software system. Processing such events can be used to derive new facts about the system. Topic-based publish and subscribe are used to send and receive events. Complex event processing techniques are used to derive more events from the original events.

WSO2 Message Broker provides functionality to support both messaging and eventing. First, this article provides an overview of the component architecture. Second, it describes its two main features - Messaging and Eventing with relevant sample code [1].

Applies To

WSO2 MB 1.0.0

Contents

Architecture Overview

Architecture Overview

The diagram above shows the composition of WSO2 MB components. It mainly consists of three main components called EventBroker, MessageBox and Qpid. These components provide an API for different types of clients. WS-Eventing[1] component implements the WS-Eventing specification so that clients can invoke it using WS-Eventing-compatible web service calls. Similarly, SQS component implements the SQS (Simple Queuing Service [2]) specification. As a result, any SQS client can use it. WSO2 MB is packaged with embedded Apache Qpid, which implements Advanced Message Queuing Protocol (AMQP). Therefore, any JMS client or an AMQP client can directly talk to the WSO2 MB as well. As shown in the diagram, both SQS and WS-Event implementations also use the embedded Qpid instance as the underling broker. This makes it possible to publish messages through a JMS API and receive from a WS-Event client and vice versa.

User authentication happens at different components. First, WS-Event service itself is a carbon admin service. Hence, authentication happens at the carbon authentication handler. For SQS, there is a special SQS authentication hander to handle the authentication. For direct Qpid server, invocations authentication happens at a Qpid authentication plug in. When EventBroker and MessageBox components invoke the Qpid instance, they use a special trust delegation technique based on a secret key shared between the Qpid instance and the other carbon components. First, at the startup time, the Qpid instance generates a key and shares with the other components. When these carbon components invoke Qpid for a particular user, it sets the password as the shared key. When authenticating, authentication plug-in checks for this special key and authenticates any users having that key as the password. Authorizing users to topics and queues happen at the Qpid Authorization plug-in. SQS specific authorization happens at the messagebox component.

The administrative console of the WSO2 MB allows users to assign role-based authorization to topics and user-based authorization to queues. Further it supports hierarchical roles and manages subscriptions, message boxes and queues. The following diagram shows a details of WSO2 MB components.

Architecture detail

Messaging

Messaging is a key aspect of distributed computing. This enables sending messages between different applications asynchronously. One application can send the messages to the broker and other applications can receive messages from the broker. WSO2 MB supports two APIs for handling messages.

JMS

This is a standard Java based API to send and receive messages. There are several ways to use JMS client API in order to send and receive messages. This article shows two such simple methods using QueueReceiver and QueueSender.

Receiver

        Properties initialContextProperties = new Properties();
	initialContextProperties.put("java.naming.factory.initial",
		"org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
	String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
	initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
	initialContextProperties.put("queue.myQueue", "myQueue");


	try {
	    InitialContext initialContext = new InitialContext(initialContextProperties);
	    QueueConnectionFactory queueConnectionFactory
		    = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
	    QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
	    queueConnection.start();

	    QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

	    // Receive message
	    Queue queue = (Queue) initialContext.lookup("myQueue");
	    QueueReceiver queueReceiver = queueSession.createReceiver(queue);

	    MessageListener messageListener = new MessageListener() {

		public void onMessage(Message message) {
		    try {
		        TextMessage textMessage = (TextMessage) message;
		        System.out.println("Got the message ==> " + textMessage.getText());
		        synchronized (this) {
		            this.notify();
		        }
		    } catch (JMSException e) {
		        e.printStackTrace();
		    }
		}
	    };

	    queueReceiver.setMessageListener(messageListener);

	    synchronized (messageListener){
		try {
		    messageListener.wait();
		} catch (InterruptedException e) {}
	    }

	    queueReceiver.close();
	    queueSession.close();
	    queueConnection.stop();
	    queueConnection.close();

	} catch (NamingException e) {
	    e.printStackTrace();
	} catch (JMSException e) {
	    e.printStackTrace();
	}

First, it creates an initial context object with the required properties in order to look-up the objects. Qpid uses the AMQP. The connection string passed to client connection factory has the username, password, virtual host to connect and the server and port addresses. Then it creates an QueueConnection to communicate with the server. After that, it creates an auto acknowledged QueueSession which is used to create the QueueReceiver. The QueueReceiver listens to myQueue using a MessageListener. The receiver program waits on the MessageListener until it receives the first message.

Message Sender

        Properties initialContextProperties = new Properties();
        initialContextProperties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
        initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
        initialContextProperties.put("queue.myQueue", "myQueue");

        try {
            InitialContext initialContext = new InitialContext(initialContextProperties);
            QueueConnectionFactory queueConnectionFactory
                    = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");

            QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
            queueConnection.start();

            QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

            // first send three messages.
            TextMessage textMessage = queueSession.createTextMessage("My test message");

            // Send message
            Queue queue = (Queue) initialContext.lookup("myQueue");
            QueueSender queueSender = queueSession.createSender(queue);
            queueSender.send(textMessage);

            // Housekeeping
            queueSender.close();
            queueSession.close();
            queueConnection.stop();
            queueConnection.close();

        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }

Message sender also creates the QueueSession and creates a QueueSender to send messages. After that it sends a TextMessage to the broker.

SQS

SQS is a Web service which can be used to manage queues, send and receive messages from that. Unlike in a JMS queue, SQS Queue has a concept of visibility timeout in order to handle the network reliability issues. When a message is retrieved from an SQS queue, that message is not visible to other users (i.e. other users cannot retrieve the message) within the visibility timeout period. If the message is deleted within the visibility timeout period then the message is removed from the queue. If there is no delete request from the client within the visibility timeout period, then the message is put back to the queue so that everyone can access it. Following code shows how to access an SQS queue using relevant APIs.

Obtaining the AcesskeyID and the SecretAccessKey

SQS uses a special symmetric key signature-based authentication mechanism. In order to access an SQS service, users should have accesskeyID and secretAccessKey. Users can obtain these keys either from the Administrator or by loging to WSO2 MB using administrative console themselves. The following code shows how to get those keys programatically using respective carbon admin service calls.

            System.setProperty("javax.net.ssl.trustStore",
                    "/home/amila/projects/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks");
            System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
            //first login to the server
            String servicesString = "https://localhost:9443/services/";
            AuthenticationAdminServiceStub stub =
                    new AuthenticationAdminServiceStub(servicesString + "AuthenticationAdmin");
            stub._getServiceClient().getOptions().setManageSession(true);
            stub.login("admin", "admin", NetworkUtils.getLocalHostname());

            ServiceContext serviceContext = stub._getServiceClient().getLastOperationContext().getServiceContext();
            String sessionCookie = (String) serviceContext.getProperty(HTTPConstants.COOKIE_STRING);

            MessageBoxAdminServiceStub messageBoxAdminServiceStub = new MessageBoxAdminServiceStub(servicesString + "MessageBoxAdminService");
            messageBoxAdminServiceStub._getServiceClient().getOptions().setManageSession(true);
            messageBoxAdminServiceStub._getServiceClient().getOptions().setProperty(HTTPConstants.COOKIE_STRING, sessionCookie);

            SQSKeys sqsKeys = messageBoxAdminServiceStub.getSQSKeys("admin");
            accessKeyID = sqsKeys.getAccessKeyId();
            secretAccessKey = sqsKeys.getSecretAccessKeyId();

First, users have to authenticate to WSO2 MB using the AuthenticationAdminService. WSO2 Carbon keeps the authenticated user details in the httpSession object at the server. Therefore, we need to get the http cookie of this particular session. Once authenticated, the client can invoke the MessageBoxAdminService to obtain the required information. When invoking the MessageBoxAdminService it is required to use the same http cookie to use the authenticated session.

Accessing Queues

SQS queues can be accessed using a generated client for its services, namely, QueueService and MessageQueue. In order to properly authenticate, each and every request should have a soap header which is signed by using secret access key. Following method is used to add those authentication headers.

        OMFactory factory = OMAbstractFactory.getOMFactory();
        OMNamespace awsNs = factory.createOMNamespace("https://security.amazonaws.com/doc/2007-01-01/", "aws");
        OMElement accessKeyId = factory.createOMElement("AWSAccessKeyId", awsNs);
        accessKeyId.setText(accessKeyID);
        OMElement timestamp = factory.createOMElement("Timestamp", awsNs);
        timestamp.setText(new Date().toString());
        OMElement signature = factory.createOMElement("Signature", awsNs);

        try {
            signature.setText(calculateRFC2104HMAC(action + timestamp.getText(), secretAccessKey));
        } catch (SignatureException e) {
        }

        queueServiceStub._getServiceClient().removeHeaders();

        queueServiceStub._getServiceClient().addHeader(accessKeyId);
        queueServiceStub._getServiceClient().addHeader(timestamp);
        queueServiceStub._getServiceClient().addHeader(signature);

Following code explains how to handle SQS queues with the client API.

            // first create a queue
            QueueServiceStub queueServiceStub = new QueueServiceStub("https://localhost:9763/services/QueueService");
            CreateQueue createQueue = new CreateQueue();
            createQueue.setQueueName(QUEUE_NAME);
            createQueue.setDefaultVisibilityTimeout(new BigInteger(DEFAULT_VISIBILITY_TIMEOUT));
            // add security soap header for action CreateQueue
            addSoapHeader(queueServiceStub, "CreateQueue");
            CreateQueueResponse createQueueResponse = queueServiceStub.createQueue(createQueue);

            String queueID = createQueueResponse.getCreateQueueResult().getQueueUrl().toString();
            System.out.println("Queue created with URL ==>" + queueID);

            //send a message to the queue
            MessageQueueStub messageQueueStub = new MessageQueueStub(queueID);
            SendMessage sendMessage = new SendMessage();
            sendMessage.setMessageBody("Test Send Message");
            addSoapHeader(messageQueueStub, "SendMessage");
            messageQueueStub.sendMessage(sendMessage);

            // receive the message back from the queue.
            ReceiveMessage receiveMessage = new ReceiveMessage();
            receiveMessage.setMaxNumberOfMessages(new BigInteger(MAX_NUMBER_OF_MESSAGES));
            receiveMessage.setVisibilityTimeout(new BigInteger("2000"));
            addSoapHeader(messageQueueStub, "ReceiveMessage");
            ReceiveMessageResponse receiveMessageResponse = messageQueueStub.receiveMessage(receiveMessage);
            Message_type0[] message_type0s = receiveMessageResponse.getReceiveMessageResult().getMessage();
            if (message_type0s != null) {
                for (Message_type0 message_type0 : message_type0s) {
                    System.out.println("Received message ==> " + message_type0.getBody());

                }
            }

            String[] receiptHandlers = new String[message_type0s.length];
            for (int i = 0; i < message_type0s.length; i++) {
                receiptHandlers[i] = message_type0s[i].getReceiptHandle();
            }
            // delete the message
            DeleteMessage deleteMessage = new DeleteMessage();
            deleteMessage.setReceiptHandle(receiptHandlers);
            addSoapHeader(messageQueueStub, "DeleteMessage");
            messageQueueStub.deleteMessage(deleteMessage);

            //finally deleting the queue
            DeleteQueue deleteQueue = new DeleteQueue();
            addSoapHeader(messageQueueStub, "DeleteQueue");
            messageQueueStub.deleteQueue(deleteQueue);

First, it creates an SQS queue in the WSO2 MB. SQS queue always has a unique ID using which subsequent queue access can be done. Therefore, CreateQueueResponse message contains this ID for newly created queues. Then the client can use the MessageQueue service stub with the queue ID to perform other operations. After creating the client stub, Messages can be sent to queue using sendMessage operation. Similarly, messages can be retried using the receiveMessage operation. When retrieving the messages, the server returns a receipt handler for each message. When deleting the messages, this receipt handler should be sent with the delete request so that the server can relate the delete request to the message in the queue. Finally, it deletes the queue using the deleteQueue operation.

Eventing

Any operation that happens in a software system causes a change of state. These state changes can produce events. Details of the events can be communicated to other systems by sending the event details as a message. An intermediate broker can be used to publish these messages to any other system which has interest in these messages. WSO2 MB supports topic-based subscriptions and publishers for this requirement. As in the messaging case, both JMS and WS-Event clients are supported with WSO2 MB.

JMS

There are several ways to subscribe to a topic and receive messages from a topic using JMS API. This article shows an example of using durable subscriptions using TopicReceiver and TopicPublisher.

Subscriber

        Properties initialContextProperties = new Properties();
        initialContextProperties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
        initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
        initialContextProperties.put("topic.myTopic", "myTopic");

        try {
            InitialContext initialContext = new InitialContext(initialContextProperties);
            TopicConnectionFactory topicConnectionFactory =
                    (TopicConnectionFactory) initialContext.lookup("qpidConnectionfactory");
            TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            Topic topic = (Topic) initialContext.lookup("myTopic");
            TopicSubscriber topicSubscriber =
                    topicSession.createDurableSubscriber(topic, "mySubscription");

            MessageListener messageListener = new MessageListener() {

                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Got the message ==> " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            };

            topicSubscriber.setMessageListener(messageListener);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}

            topicSubscriber.close();
            topicSession.close();
            topicConnection.stop();
            topicConnection.close();

        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }

As in the earlier Messaging sample, first it creates an initial context and gets the TopicConnectionFactory from that. Then it creates a TopicSession and creates a durable subscription using that. Apache Qpid handles subscriptions also using queues at the server. When creating a durable subscription, it creates a durable queue to receive the published messages for that subscription. Therefore the subscription can receive the messages which are received when the subscription client is not connected to the server as well.

Publishers

        Properties initialContextProperties = new Properties();
        initialContextProperties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
        initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
        initialContextProperties.put("topic.myTopic", "myTopic");

        try {
            InitialContext initialContext = new InitialContext(initialContextProperties);
            TopicConnectionFactory topicConnectionFactory =
                    (TopicConnectionFactory) initialContext.lookup("qpidConnectionfactory");
            TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            Topic topic = (Topic) initialContext.lookup("myTopic");
            TopicPublisher topicPublisher = topicSession.createPublisher(topic);

            TextMessage textMessage = topicSession.createTextMessage("Test Message");

            topicPublisher.publish(textMessage);


            topicPublisher.close();
            topicSession.close();
            topicConnection.stop();
            topicConnection.close();


        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }

Publisher also follows the same steps and creates a TopicPublisher to publish messages to a topic.

WS-Eventing

WS-Eventing defines a web service API for an Event source. Users can subscribe to this event source by providing an event sink URL. When an event occurs in the event source, it sends the event to the event sink URL. WS-Eventing does not provide an API to publish messages. But WSO2 MB creates its own "publish API" in order to make it work as an event broker. Further, WSO2 MB comes with the broker client interface which can be used to invoke the EventBroker service. Following code shows how to use broker client API to communicate with the WSO2 MB.

Subscriber

private AxisServer axisServer;
    private BrokerClient brokerClient;

    public void start() {
        try {
            // setting the keystore to access using https
            System.setProperty("javax.net.ssl.trustStore",
                    "/home/amila/projects/branch/carbon/3.2.0/products/mb/1.0.0/modules/distribution/product/target/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks");
            System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

            // start a simple axis server and deploy a service to receive the message.
            this.axisServer = new AxisServer();
            this.axisServer.deployService(EventSinkService.class.getName());


            // give time to start the simple http server
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }

        } catch (AxisFault axisFault) {
            System.out.println("Can not start the server");
        }

    }

    public String subscribe() {
        // subscribes using the sample axis2 server created
        try {
            // initiate the broker client. this will authenticate with the back end service
            this.brokerClient =
                    new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin");
            String subscriptionID = this.brokerClient.subscribe("myTopic",
                    "https://localhost:6060/axis2/services/EventSinkService/receive");
            return subscriptionID;
        } catch (BrokerClientException e) {
            e.printStackTrace();
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        } catch (AuthenticationExceptionException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void unsubscribe(String subscriptionID) {
        try {
            this.brokerClient.unsubscribe(subscriptionID);
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }

    public void stop() {
        try {
            this.axisServer.stop();
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        }
    }

    public static void main(String[] args) {

        Subscriber subscriber = new Subscriber();
        subscriber.start();
        String subscriptionId = subscriber.subscribe();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
        }
        subscriber.unsubscribe(subscriptionId);
        subscriber.stop();
    }

First subscriber creates an Axis2 service to receive the messages. Then use that service as the event sink URL. After waiting some time to receive messages, it unsubscribes and stops the Axis2 server.

Publisher

private BrokerClient brokerClient;

    public Publisher() {
        try {
            // setting the keystore to use with the https
            System.setProperty("javax.net.ssl.trustStore",
                    "/home/amila/projects/branch/carbon/3.2.0/products/mb/1.0.0/modules/distribution/product/target/wso2mb-1.0.0/repository/resources/security/wso2carbon.jks");
            System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");

            // creating a broker client instance this first authenticate with the back end
            this.brokerClient =
                    new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin");

        } catch (AuthenticationExceptionException e) {
            e.printStackTrace();
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        }
    }

    public void publish() {
        try {
            // publish the message to the topic
            this.brokerClient.publish("myTopic", getOMElementToSend());
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        }
    }

    /**
     * generates a sample payload according to the EventSinkService
     * @return - sample payload
     */
    private OMElement getOMElementToSend() {
        OMFactory omFactory = OMAbstractFactory.getOMFactory();
        OMNamespace omNamespace = omFactory.createOMNamespace("https://wsevent.eventing.sample", "ns1");
        OMElement receiveElement = omFactory.createOMElement("receive", omNamespace);
        OMElement messageElement = omFactory.createOMElement("message", omNamespace);
        messageElement.setText("Test publish message");
        receiveElement.addChild(messageElement);
        return receiveElement;

    }

    public static void main(String[] args) {

        Publisher publisher = new Publisher();
        publisher.publish();
    }

The Publisher publishers a message in the expected form of the EventSink service.

Conclusion

Messaging and Eventing play an important role in distributed computing. WSO2 MB, which address this space of the WSO2 carbon platform, provides support for different APIs, namely JMS, SQS and WS-Eventing to suit for different purposes. This article describes how to use each and every API in different types of clients.

References

[1]https://schemas.xmlsoap.org/ws/2004/08/eventing/

[2]https://aws.amazon.com/archives/Amazon-SQS/2317

Author

Amila Suriarachchi, Software Architect, WSO2 Inc.

 

About Author

  • Amila Suriarachchi
  • Architect, Member, Management Committee - Data Technologies
  • WSO2 Inc.