2012/06/07
7 Jun, 2012

WSO2 ESB by Example - Processing a Long Message Backlog

  • Rajika Kumarasiri
  • Senior Software Engineer - WSO2

Applies To

WSO2 ESB 4.0.0 and above
Apache ActiveMQ 5.4.1

Table of Content

Introduction

Java Messaging Service API(JMS) [1] still plays a major role in Enterprise Application Integration.
It may be the most popular messaging API still in market despite of the recently popular
messaging API such as Advance Message Queueing Protocol(AMQP)[2]. JMS is widely used in
Enterprise Integration not only because it is old but also some of the very interesting features
that are supported by JMS. One such key feature of JMS is the ability to persist the message
if the other party( the consumer party) is not online. This unique feature allows to solve
many useful integration problems. For example consider a factory which produces garments for
world market. Assume the factory is located in Sri Lanka and the orders are processed at the
USA office and send them to Sri Lankan office. The factory and the office is located in the two
ends of the world and factory is waiting for the orders from the USA office. Since the time difference
between two countries nearly a half a day what is need to be done to make sure employees of both
countries go home at the end of a normal working day time(5.00 pm for example). The USA office
need to send the orders when they operate and the Sri Lankan office need to persist those
order requests until the employees of the Sri Lankan factory come to office (because of the time
difference) otherwise employees of a one company will have to wait overnight. This simple
example shows how one of the key features of JMS that helps to avoid this problem. That is the
ability to persist any messages that are for off line consumers.

WSO2 ESB has a production ready JMS transport which can be used to implement various
Enterprise Integration requirements in JMS. This article specially describes how to
use WSO2 ESB to process a long message back log using the WSO2 ESB's JMS
transport. Although these scenarios are simple and straight forward to implement using
WSO2 ESB they will play an important role as a part of large integration scenario.

The first section will describes the article in some detail manner. Then rest of the section
will describe the configurations that can be used to run the scenario. Finally the article
will end up with a set of additional reading which will also helps in various way in integration
of different systems with WSO2 ESB using JMS transport.

Processing a Long Message Backlog

In most business scenarios it's commons to have a scenario where processing of a large message
backlog(for example a message queue with 1000 messages) may be once a day or once a week. This
particular use case demonstrates how to deploy WSO2 ESB's JMS transport for high performance
deployment. This is possible because of the ability of the WSO2 ESB's JMS transport to cache
JMS objects such as JMS connections, sessions, consumers and producers.
These JMS objects caching is describes using a general scenario where WSO2 ESB's JMS transport uses
to process a large(~1000) message backlog. These hints will let you to deploy WSO2 ESB's JMS transport
for high performance deployments.

Use Case

This use case was implemented as solution to a real world problem. A order processing system
receives a large bulk of order requests(~1000 messages) and those order requests should be processed one by one
at a later time(generally after couple of days) by a order processing system. Orders should
be persisted and keep safe until consumed by the order processing system. When the order processing
system is online(after a couple of days) those persisted messages will be read and processed.

Solution

The order request message bulk will be kept on a JMS queue that reside on a JMS broker. For
the demonstration this article will use Apache ActiveMQ JMS broker. WSO2 ESB will act as a consumer
for those order request and once the messages are available those will be processed;

  1. One by one.
  2. Using a set of concurrent consumers.

Also WSO2 ESB's JMS transport will be used to send those messages to another queue
demonstrating how WSO2 ESB can act as a producer for a large number of
messages. This scenario will demonstrates how WSO2 ESB's JMS transport can be used to
process(either as a consumer or producer) large number messages by caching JMS objects and with minimum memory.

Processing of a Long Message Queue using WSO2 ESB's JMS transport

This section describes how the solution was implemented and it also provides the full working
sample configurations and a sample JMS client. When comes to use of JMS with a really high
load it's always important to follow couple of best practises for better performances[5]. One thing
that need to be done is to cache the JMS objects(such as JMS connections, sessions, consumers and producers)
so that they will be re-used in subsequent invocations. WSO2 ESB's JMS transport uses the following
property to define the cache level(i.e. which object should be cached) at start up.

transport.jms.CacheLevel - {none, connection, session, consumer, producer, auto}.

This parameter can be configured either in axis2.xml(when WSO2 ESB acts as a producer) or
as a proxy service parameter (when WSO2 ESB acts as consumer).
Each of the value has following meaning.

Parameter value Description
none No caching of JMS objects will be used
connection JMS connections objects will be cached
session JMS session and connection objects will be cached
consumer JMS connections, sessions and consumers object will be cached will be cached
producer JMS connections, sessions and producer objects will be cached
auto Use an appropriate caching level (depending on the transaction strategy)

The JMS Message Backlog proxy configuration

Following configuration describes the JMS proxy. It's a traditional JMS
proxy which looks like sample #250[3]. It perform two functions. First the JMS proxy
receives messages from a queue called 'JMSMSGRecevingProxy'(the name of the proxy service) and
also send the messages into a second queue('SimpleStockQuoteService', from which the backend service
will pick messages for processing). This proxy configuration demonstrates how to cache
the JMS objects when WSO2 ESB acts as a consumer and producer. As mentioned by default WSO2
JMS transport uses the AUTO cache level. Following configuration uses consumer level for JMS
object caching for the JMS receiving side. As described earlier this will cache all the consumers,
sessions and connections at the receiving side. Next come the question how to cache the JMS objects
at sending side? The JMS cache parameter for sending side has to define in the JMS connection factory
that will be using. In the below example the JMS cache parameter will be defined in the
'CachedJMSConnectionFactory' connection factory definition in axis2.xml. In addition to the above JMS object
caching the proxy also disable a property called 'transportNonBlocking'. This to make sure that ESB won't
create new thread per each message that is being sent out. This another important configuration that need
add in order to make sure that ESB keeps a constant memory usage over the time.

<proxy xmlns="http://ws.apache.org/ns/synapse" name="JMSMSGRecevingProxy" transports="jms">
        <target>
            <inSequence>
                <property name="transportNonBlocking" scope="axis2" action="remove"/>
                <property action="set" name="OUT_ONLY" value="true"/>
            </inSequence>
            <endpoint>
                <address uri="jms:/?transport.jms.ConnectionFactory=CachedJMSConnectionFactory"/>
            </endpoint>
        </target>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>text/xml</default>
            </rules>
        </parameter>
        <parameter name="transport.jms.CacheLevel">consumer</parameter>
</proxy>         
        

Next come the configuration for enabling JMS transport sender/receiver pair in axis2.xml in
$ESB_HOME/repository/conf/. Below is the configuration for JMS listener and sender respectively.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
</transportReceiver>

Below is the configuration for JMS transport sender. Note how the JMS object cache parameter is defined
to producer level. This will cache all the JMS objects such as producers, sessions and connections
at the senders side. The destination is given as dynamicQueues/SimpleStockQuoteService, where
the SimpleStockQuoteService deployed on sample axis2 server will process the message while
keep listen on the queue.

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
         <parameter name="CachedJMSConnectionFactory" locked="false">
             <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
             <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
             <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
             <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
             <parameter name="transport.jms.CacheLevel" locked="false">producer</parameter>
             <parameter name="transport.jms.Destination" locked="false">dynamicQueues/SimpleStockQuoteService</parameter>
             <parameter name="transport.jms.DestinationType" locked="false">queue</parameter>
         </parameter>
</transportSender>

The JMS Client to Send a Large(~1000) Number of messages

Following describes the JMS client code uses.

 package sample;

import javax.jms.*;
import javax.naming.InitialContext;
import java.util.Properties;

public class JMSClient {
    public static void main(String[] args) {
        try {
            String noItrStr = System.getProperty("no.of.iteration");
            int noItr;
            if (noItrStr == null) {
                noItr = 1000;
            } else {
                noItr = Integer.parseInt(noItrStr);
            }
            // place holder for JNDI properties
            Properties env = new Properties();

            // specify the ActiveMQ specific JNDI properties
            // the JMS provider URL
            env.put("java.naming.provider.url", "tcp://localhost:61616");
            // the initial connection factory
            env.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");

            // create the initial context passing the vendor specific connection factory parameters
            InitialContext ic = new InitialContext(env);

            // look up the Queue Connection factory
            String connectionFactoryName = "ConnectionFactory";
            QueueConnectionFactory confac = (QueueConnectionFactory) ic.lookup(connectionFactoryName);

            // create a Queue connection
            QueueConnection connection = confac.createQueueConnection();

            // create the session and mark for auto acknowledge
            QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

            // look up the destination to send the message
            String queueName = "dynamicQueues/JMSMSGRecevingProxy";
            Queue destination = (Queue) ic.lookup(queueName);

            // create a message sender to send the message
            QueueSender sender = session.createSender(destination);

            // create a text message with the payload for the back end service
            String payLoad =
                  "<soapenv:Envelope xmlns:soapenv=\"https://schemas.xmlsoap.org/soap/envelope/\">" +
                            "<soapenv:Header xmlns:wsa=\"https://www.w3.org/2005/08/addressing\"/>\n" +
                            "<soapenv:Body>\n" +
                            "<m:placeOrder xmlns:m=\"https://services.samples\">\n" +
                            "    <m:order>\n" +
                            "        <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" +
                            "        <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" +
                            "        <m:symbol>" + "IBM" + "</m:symbol>\n" +
                            "    </m:order>\n" +
                            "</m:placeOrder> \n" +
                            "</soapenv:Body>\n" +
                            "</soapenv:Envelope";
            TextMessage tm = session.createTextMessage(payLoad);

            // send the message
            for (int i = 0; i  0.5 ? 1 : -1) * varience * base * rand))
                * (onlypositive ? 1 : (rand > 0.5 ? 1 : -1));
    }
}
       

Deployment

  1. Step 0: Start the ActiveMQ JMS broker.
  2. Step 1: Copy the JMSMSGRecevingProxy.xml from the sample folder(attached in the resouce section) into
    $ESB_HOME/repository/deployment/server/synapse-configs/default/proxy-services.
  3. Step 2: Enable JMS transport sender/receiver pair as described above or just replace the
    $ESB_HOMe/repository/conf/axis2.xml with the given one in sample folder.
  4. Step 3: Enable JMS transport receiver in sample axis2 server in
    $ESB_HOME/samples/axis2Server/repository/conf/axis2.xml as described above.
  5. Step 4: Deploy SimpleStockQuote service by invoking ant in
    $ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService
  6. Step 5: Copy the JMS client jars activemq-core-5.4.1.jar, geronimo-jms_1.1_spec-1.1.1.jar,
    geronimo-j2ee-management_1.1_spec-1.0.1.jar into $ESB_HOME/repository/components/lib
  7. Step 6: Start simple axis2 server using the axis2Server.sh{bat} in
    $ESB_HOME/samples/axis2Server/.
  8. Step 7: Invoke the client to send 1000 JMS messages. Use following command to invoke the client
    You will need to fix the no of iteration and carbon_home parameters according to your environment.

    ant run -Ditr=no-of-iteration -Dcarbon_home=path-to-your-carbon-home

  9. Step 8: Start WSO2 ESB and notice how ESB starts to process the large message queue.

Configuring Single Consumer and Multiple Consumers

Another very useful feature of WSO2 ESB's JMS transport is the ability to configure the
no of concurrent consumers. The property 'transport.jms.ConcurrentConsumers' can be configured
either as a JMS proxy service parameter or a parameter in JMS connection factory definition in
axis2.xml. By default WSO2 ESB's JMS transport use a single JMS consumer. This parameter can be
set to either 1 or to a large value in order WSO2 ESB's JMS transport to process message
one by one or as large number of concurrent consumers. The maximum number of concurrent consumers(or the number of JMS proxies) that
can be deployed is limited by the base transport worker pool that is used by the JMS transport. The size of this worker pool can be configured
via the system property 'snd_t_core' and 'snd_t_max'. By default a max value of 20 is assumed. If a user wants to tweak the worker pool these
parameters can be passed as system properties (./wso2server.sh -Dsnd_t_max=100 for example).
The no of concurrent producers are normally limit by the synapse core worker pool.

When comes to dealing with large number of messages it's important to allocate sufficient main
memory for the system. If your system is expose to couple of magnitudes of messages which was
describes in the article it's a probably a good idea to increase the heap size allocated for the carbon server.
That can be done in the end of wso2server.sh{.bat} by editing the entry '-Xms256m -Xmx512m -XX:MaxPermSize=256m'
appropriately. For further reference also consult[4].

Conclusion

As given by the document, the system is processing a large message queue with minimum
memory requirements.

Future work

Given configurations should be sufficient to process a large message backlog. Users can
configure the memory requirement, the number of concurrent consumers and caching level of
JMS objects according to the requirements. To demonstrate the processing a bulk of messages
the guide, starts the WSO2 ESB server once the messages are placed in the queue, but in
real world the server may be keep on running while it receives a set of bulk messages time to
time.

References:


  1. https://www.oracle.com/technetwork/java/index-jsp-142945.html
  2. https://www.amqp.org/

  3. https://wso2.org/project/esb/java/4.0.3/docs/samples/transport_samples.html#Sample250

  4. https://techfeast-hiranya.blogspot.com/2010/11/taming-java-garbage-collector.html

  5. https://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/perf-tuning.html

Further Reading


  1. https://wso2.org/library/articles/2011/10/wso2-esb-example-jms-failover

  2. https://wso2.org/library/articles/2011/12/wso2-esb-example-pubsub-soa

  3. https://wso2.org/library/articles/2011/11/wso2-esb-example-two-wayrequestresponse-semantic-jms

Resources

Author

Rajika Kumarasiri, Senior Software Engineer, [email protected]

 

About Author

  • Rajika Kumarasiri
  • Senior Software Engineer
  • WSO2 Inc.