WSO2Con 2013 CFP Banner

WSO2 ESB by Example - Processing a Long Message Backlog

Discuss this article on Stack Overflow
By Rajika Kumarasiri
  • 7 Jun, 2012
  • Level:  Intermediate
  • Reads: 8833
This article describes how to use WSO2 ESB's JMS transport to process a large message backlog (a long message queue). This also demonstrates how to cache various JMS objects(such as producers, consumers, sessions and connections) when deploying WSO2 ESB's JMS transport for high performance. The material is self explanatory as much as possible. A new comer is encouraged to read the WSO2 ESB quick start guide.
Rajika Kumarasiri
Senior Software Engineer
WSO2 Inc.

Applies To

WSO2 ESB 4.0.0 and above
Apache ActiveMQ 5.4.1

Table of Content

Introduction

Java Messaging Service API(JMS) [1] still plays a major role in Enterprise Application Integration. It may be the most popular messaging API still in market despite of the recently popular messaging API such as Advance Message Queueing Protocol(AMQP)[2]. JMS is widely used in Enterprise Integration not only because it is old but also some of the very interesting features that are supported by JMS. One such key feature of JMS is the ability to persist the message if the other party( the consumer party) is not online. This unique feature allows to solve many useful integration problems. For example consider a factory which produces garments for world market. Assume the factory is located in Sri Lanka and the orders are processed at the USA office and send them to Sri Lankan office. The factory and the office is located in the two ends of the world and factory is waiting for the orders from the USA office. Since the time difference between two countries nearly a half a day what is need to be done to make sure employees of both countries go home at the end of a normal working day time(5.00 pm for example). The USA office need to send the orders when they operate and the Sri Lankan office need to persist those order requests until the employees of the Sri Lankan factory come to office (because of the time difference) otherwise employees of a one company will have to wait overnight. This simple example shows how one of the key features of JMS that helps to avoid this problem. That is the ability to persist any messages that are for off line consumers.

WSO2 ESB has a production ready JMS transport which can be used to implement various Enterprise Integration requirements in JMS. This article specially describes how to use WSO2 ESB to process a long message back log using the WSO2 ESB's JMS transport. Although these scenarios are simple and straight forward to implement using WSO2 ESB they will play an important role as a part of large integration scenario.

The first section will describes the article in some detail manner. Then rest of the section will describe the configurations that can be used to run the scenario. Finally the article will end up with a set of additional reading which will also helps in various way in integration of different systems with WSO2 ESB using JMS transport.

Processing a Long Message Backlog

In most business scenarios it's commons to have a scenario where processing of a large message backlog(for example a message queue with 1000 messages) may be once a day or once a week. This particular use case demonstrates how to deploy WSO2 ESB's JMS transport for high performance deployment. This is possible because of the ability of the WSO2 ESB's JMS transport to cache JMS objects such as JMS connections, sessions, consumers and producers. These JMS objects caching is describes using a general scenario where WSO2 ESB's JMS transport uses to process a large(~1000) message backlog. These hints will let you to deploy WSO2 ESB's JMS transport for high performance deployments.

Use Case

This use case was implemented as solution to a real world problem. A order processing system receives a large bulk of order requests(~1000 messages) and those order requests should be processed one by one at a later time(generally after couple of days) by a order processing system. Orders should be persisted and keep safe until consumed by the order processing system. When the order processing system is online(after a couple of days) those persisted messages will be read and processed.

Solution

The order request message bulk will be kept on a JMS queue that reside on a JMS broker. For the demonstration this article will use Apache ActiveMQ JMS broker. WSO2 ESB will act as a consumer for those order request and once the messages are available those will be processed;

  1. One by one.
  2. Using a set of concurrent consumers.

Also WSO2 ESB's JMS transport will be used to send those messages to another queue demonstrating how WSO2 ESB can act as a producer for a large number of messages. This scenario will demonstrates how WSO2 ESB's JMS transport can be used to process(either as a consumer or producer) large number messages by caching JMS objects and with minimum memory.

Processing of a Long Message Queue using WSO2 ESB's JMS transport

This section describes how the solution was implemented and it also provides the full working sample configurations and a sample JMS client. When comes to use of JMS with a really high load it's always important to follow couple of best practises for better performances[5]. One thing that need to be done is to cache the JMS objects(such as JMS connections, sessions, consumers and producers) so that they will be re-used in subsequent invocations. WSO2 ESB's JMS transport uses the following property to define the cache level(i.e. which object should be cached) at start up.

transport.jms.CacheLevel - {none, connection, session, consumer, producer, auto}.

This parameter can be configured either in axis2.xml(when WSO2 ESB acts as a producer) or as a proxy service parameter (when WSO2 ESB acts as consumer). Each of the value has following meaning.

Parameter value Description
none No caching of JMS objects will be used
connection JMS connections objects will be cached
session JMS session and connection objects will be cached
consumer JMS connections, sessions and consumers object will be cached will be cached
producer JMS connections, sessions and producer objects will be cached
auto Use an appropriate caching level (depending on the transaction strategy)

The JMS Message Backlog proxy configuration

Following configuration describes the JMS proxy. It's a traditional JMS proxy which looks like sample #250[3]. It perform two functions. First the JMS proxy receives messages from a queue called 'JMSMSGRecevingProxy'(the name of the proxy service) and also send the messages into a second queue('SimpleStockQuoteService', from which the backend service will pick messages for processing). This proxy configuration demonstrates how to cache the JMS objects when WSO2 ESB acts as a consumer and producer. As mentioned by default WSO2 JMS transport uses the AUTO cache level. Following configuration uses consumer level for JMS object caching for the JMS receiving side. As described earlier this will cache all the consumers, sessions and connections at the receiving side. Next come the question how to cache the JMS objects at sending side? The JMS cache parameter for sending side has to define in the JMS connection factory that will be using. In the below example the JMS cache parameter will be defined in the 'CachedJMSConnectionFactory' connection factory definition in axis2.xml. In addition to the above JMS object caching the proxy also disable a property called 'transportNonBlocking'. This to make sure that ESB won't create new thread per each message that is being sent out. This another important configuration that need add in order to make sure that ESB keeps a constant memory usage over the time.

<proxy xmlns="http://ws.apache.org/ns/synapse" name="JMSMSGRecevingProxy" transports="jms">
        <target>
            <inSequence>
                <property name="transportNonBlocking" scope="axis2" action="remove"/>
                <property action="set" name="OUT_ONLY" value="true"/>
            </inSequence>
            <endpoint>
                <address uri="jms:/?transport.jms.ConnectionFactory=CachedJMSConnectionFactory"/>
            </endpoint>
        </target>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>text/xml</default>
            </rules>
        </parameter>
        <parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>         
        

Next come the configuration for enabling JMS transport sender/receiver pair in axis2.xml in $ESB_HOME/repository/conf/. Below is the configuration for JMS listener and sender respectively.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
</transportReceiver>

Below is the configuration for JMS transport sender. Note how the JMS object cache parameter is defined to producer level. This will cache all the JMS objects such as producers, sessions and connections at the senders side. The destination is given as dynamicQueues/SimpleStockQuoteService, where the SimpleStockQuoteService deployed on sample axis2 server will process the message while keep listen on the queue.

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
         <parameter name="CachedJMSConnectionFactory" locked="false">
             <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
             <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
             <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
             <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
             <parameter name="transport.jms.CacheLevel" locked="false">producer</parameter>
             <parameter name="transport.jms.Destination" locked="false">dynamicQueues/SimpleStockQuoteService</parameter>
             <parameter name="transport.jms.DestinationType" locked="false">queue</parameter>
         </parameter>
</transportSender>

The JMS Client to Send a Large(~1000) Number of messages

Following describes the JMS client code uses.

 package sample;

import javax.jms.*;
import javax.naming.InitialContext;
import java.util.Properties;

public class JMSClient {
    public static void main(String[] args) {
        try {
            String noItrStr = System.getProperty("no.of.iteration");
            int noItr;
            if (noItrStr == null) {
                noItr = 1000;
            } else {
                noItr = Integer.parseInt(noItrStr);
            }
            // place holder for JNDI properties
            Properties env = new Properties();

            // specify the ActiveMQ specific JNDI properties
            // the JMS provider URL
            env.put("java.naming.provider.url", "tcp://localhost:61616");
            // the initial connection factory
            env.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");

            // create the initial context passing the vendor specific connection factory parameters
            InitialContext ic = new InitialContext(env);

            // look up the Queue Connection factory
            String connectionFactoryName = "ConnectionFactory";
            QueueConnectionFactory confac = (QueueConnectionFactory) ic.lookup(connectionFactoryName);

            // create a Queue connection
            QueueConnection connection = confac.createQueueConnection();

            // create the session and mark for auto acknowledge
            QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

            // look up the destination to send the message
            String queueName = "dynamicQueues/JMSMSGRecevingProxy";
            Queue destination = (Queue) ic.lookup(queueName);

            // create a message sender to send the message
            QueueSender sender = session.createSender(destination);

            // create a text message with the payload for the back end service
            String payLoad =
                  "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">" +
                            "<soapenv:Header xmlns:wsa=\"http://www.w3.org/2005/08/addressing\"/>\n" +
                            "<soapenv:Body>\n" +
                            "<m:placeOrder xmlns:m=\"http://services.samples\">\n" +
                            "    <m:order>\n" +
                            "        <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" +
                            "        <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" +
                            "        <m:symbol>" + "IBM" + "</m:symbol>\n" +
                            "    </m:order>\n" +
                            "</m:placeOrder> \n" +
                            "</soapenv:Body>\n" +
                            "</soapenv:Envelope";
            TextMessage tm = session.createTextMessage(payLoad);

            // send the message
            for (int i = 0; i < noItr; i++) {
                sender.send(tm);
            }
            System.out.println("Sent '" + noItr + "', messages!");

            // close the resources
            sender.close();
            session.close();
            connection.close();
        } catch (Exception e) {WSO2 ESB
            e.printStackTrace();
        }
    }

    private static double getRandom(double base, double varience, boolean onlypositive) {
        double rand = Math.random();
        return (base + ((rand > 0.5 ? 1 : -1) * varience * base * rand))
                * (onlypositive ? 1 : (rand > 0.5 ? 1 : -1));
    }
}
       

Deployment

  1. Step 0: Start the ActiveMQ JMS broker.
  2. Step 1: Copy the JMSMSGRecevingProxy.xml from the sample folder(attached in the resouce section) into $ESB_HOME/repository/deployment/server/synapse-configs/default/proxy-services.
  3. Step 2: Enable JMS transport sender/receiver pair as described above or just replace the $ESB_HOMe/repository/conf/axis2.xml with the given one in sample folder.
  4. Step 3: Enable JMS transport receiver in sample axis2 server in $ESB_HOME/samples/axis2Server/repository/conf/axis2.xml as described above.
  5. Step 4: Deploy SimpleStockQuote service by invoking ant in $ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService
  6. Step 5: Copy the JMS client jars activemq-core-5.4.1.jar, geronimo-jms_1.1_spec-1.1.1.jar, geronimo-j2ee-management_1.1_spec-1.0.1.jar into $ESB_HOME/repository/components/lib
  7. Step 6: Start simple axis2 server using the axis2Server.sh{bat} in $ESB_HOME/samples/axis2Server/.
  8. Step 7: Invoke the client to send 1000 JMS messages. Use following command to invoke the client You will need to fix the no of iteration and carbon_home parameters according to your environment.

    ant run -Ditr=no-of-iteration -Dcarbon_home=path-to-your-carbon-home

  9. Step 8: Start WSO2 ESB and notice how ESB starts to process the large message queue.

Configuring Single Consumer and Multiple Consumers

Another very useful feature of WSO2 ESB's JMS transport is the ability to configure the no of concurrent consumers. The property 'transport.jms.ConcurrentConsumers' can be configured either as a JMS proxy service parameter or a parameter in JMS connection factory definition in axis2.xml. By default WSO2 ESB's JMS transport use a single JMS consumer. This parameter can be set to either 1 or to a large value in order WSO2 ESB's JMS transport to process message one by one or as large number of concurrent consumers. The maximum number of concurrent consumers(or the number of JMS proxies) that can be deployed is limited by the base transport worker pool that is used by the JMS transport. The size of this worker pool can be configured via the system property 'snd_t_core' and 'snd_t_max'. By default a max value of 20 is assumed. If a user wants to tweak the worker pool these parameters can be passed as system properties (./wso2server.sh -Dsnd_t_max=100 for example). The no of concurrent producers are normally limit by the synapse core worker pool.

When comes to dealing with large number of messages it's important to allocate sufficient main memory for the system. If your system is expose to couple of magnitudes of messages which was describes in the article it's a probably a good idea to increase the heap size allocated for the carbon server. That can be done in the end of wso2server.sh{.bat} by editing the entry '-Xms256m -Xmx512m -XX:MaxPermSize=256m' appropriately. For further reference also consult[4].

Conclusion

As given by the document, the system is processing a large message queue with minimum memory requirements.

Future work

Given configurations should be sufficient to process a large message backlog. Users can configure the memory requirement, the number of concurrent consumers and caching level of JMS objects according to the requirements. To demonstrate the processing a bulk of messages the guide, starts the WSO2 ESB server once the messages are placed in the queue, but in real world the server may be keep on running while it receives a set of bulk messages time to time.

References:

  1. http://www.oracle.com/technetwork/java/index-jsp-142945.html
  2. http://www.amqp.org/
  3. http://wso2.org/project/esb/java/4.0.3/docs/samples/transport_samples.html#Sample250
  4. http://techfeast-hiranya.blogspot.com/2010/11/taming-java-garbage-collector.html
  5. http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/perf-tuning.html

Further Reading

  1. http://wso2.org/library/articles/2011/10/wso2-esb-example-jms-failover
  2. http://wso2.org/library/articles/2011/12/wso2-esb-example-pubsub-soa
  3. http://wso2.org/library/articles/2011/11/wso2-esb-example-two-wayrequestresponse-semantic-jms

Resources

Author

Rajika Kumarasiri, Senior Software Engineer, rajika@wso2.com