2011/11/11
11 Nov, 2011

WSO2 ESB by Example - Two way (request/response semantic) JMS

  • Rajika Kumarasiri
  • Senior Software Engineer - WSO2

Applies To

WSO2 ESB 4.0.0 and above

Table of Content

Introduction

Java Messaging Service API(JMS) [1] still plays a major role in Enterprise Application
Integration.
It may be the most popular messaging API still in market despite of the recently popular
messaging API such as Advance Message Queueing Protocol(AMQP)[2]. The most widely used
nature of
JMS is to use it for one way messaging. That means a producer
in case of a queue and a publisher in case of a topic
will just send the message to the queue or topic and will forget
about the
message. On the other side consumer(in case of a queue) or the subscriber(in case of a
topic) will
consume the message. Although JMS API is inherently asynchronous sometime it's important
to
wait for a response to a message just sent using the same JMS client. Individual
producers
(or publishers) can be tie together with individual consumers(or subscribers) so that
they
can act in a single request/response message semantic. This allows us to build powerful
request/response message scenarios using JMS.

WSO2 ESB has a production ready JMS transport which can be used to implement various
Enterprise
Integration requirements in JMS. This article specially describes how to implement two
way(or request
response semantics)JMS scenarios using WSO2 ESB's JMS transport. Although these scenarios
are simple and straight forward to implement using WSO2 ESB they
will play an important role as a part of large integration scenario.

The first section will cover how to achieve a two way JMS scenario in general given that
the asynchronous nature of JMS. Then the subsequent sections will describes three
different
JMS two way scenarios that can be implemented using WSO2 ESB. These samples will be full
working configurations so anybody can
use them as the basis for implementing two way JMS productions scenarios using WSO2 ESB.
All scenarios are given with required configurations and code examples so a reader can
try
the sample of his/her own. The material is self explanatory as much as possible, a basic
knowledge of WSO2 ESB will be helpful. ESB quick start guide will help for a new
comer[3]
to quickly become familiar with WSO2 ESB.

Two way (request/response) JMS

As described in the introduction, the classic way of JMS messaging is one way messaging.
That is the producer(or the publisher) will send the message and forget it. On the other
side the consumer(or the subscriber) will consume the message. The third software
component which is known as the JMS broker provides the following important
functionality
among other functionality.

  1. Store the messages of offline message consumers.
  2. Producers and consumers can communicate without knowing about each other

The endpoint for a JMS client on the message broker which is called a destination can be
either a queue or a topic that reside on the JMS broker. A queue or a topic provide the
connection between the message producer and consumer or the publisher and the
subscriber.
When comes to two way JMS communication that is achieved by using a another queue or
topic
together with the JMSReplyTo[3] header. Before sending the message out, another destination
(which is either queue or a topic) is created and set that as the JMSReplyTo header.
On the other side the message consumer(or the subscriber) will use JMSReplyTo property to
send the response back to the response destination. The message producer will be waiting
on the reply destination and will process the message. A natural question that comes is
how to coordinate the request and response if we use the same response queue for more
than
one message, or if we use the response queue for concurrent messages ? In other words
how to identify if this response is for that request message that was sent? That is done
by using the JMSCorrelationID[4] JMS header. A message consumer or a subscriber will be
created using the JMSCorrelationID so that the response can be sent to the destination
given by JMSReplyTo. A more sophisticated(select the message based on some properties or
based on content etc..) request/reply can be achieved by using the message selector
concepts in JMS when creating the JMS consumer.

A JMS temporary queue/topic is used in most cases as the reply destination in a two way
JMS scenario. Using a temporary queue(or topic) has some advantages because temporary
queues only lives for a configured amount of time. When the temporary queue disappear the
resources allocated for them will be re-allocated.

Two way JMS scenarios using WSO2 ESB

WSO2 ESB's JMS transport uses the concepts such as temporary queues, JMSCorrelationID,
messageSeclectors and JMSReplyTo to achieve two way messaging. A user can configure the
reply destination type and name if required. In case of missing of these properties a
temporary queue will be assume by WSO2 ESB JMS transport. A user can also configure the message
selector parameter as well. WSO2 ESB JMS transport will handle the request/response internally so
the user do not need to worry. Following parameters[4] can be used to configure the reply
destination type, destination JNDI name and the message selector parameters.

  • transport.jms.ReplyDestinationType - Reply destination type (queue or topic)
  • transport.jms.ReplyDestination - JNDI name of the reply destination
  • transport.jms.MessageSelector - Message selector to be used

Following individual section will describe each of the two way scenarios in detail and
how to configure and test each of the scenarios. Note that in each cases a temporary
queue is used as the reply destination.

Setting up the environment

All of the following scenarios are given with the sample configurations and the required
Java client code so that the scenarios can be tried out by the reader. Apache ActiveMQ[5] JMS
broker will be used as the JMS broker. When configuring WSO2 ESB with ActiveMQ broker following
JMS client libraries should be copied into $ESB_HOME/lib/api. ActiveMQ client jars
activemq-core-VERSION.jar and geronimo-j2ee-management-VERSION.jar
are available in the lib folder of the ActiveMQ distribution.

JMS transport sender and receiver configurations of WSO2 ESB should be enabled by
uncommenting the following sections in $ESB_HOME/repository/conf/axis2.xml.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
             <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
             <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
             <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
	     <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
             <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
             <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
             <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
	     <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
</transportReceiver>
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">

Next thing is to run the ActiveMQ broker by issuing ./activemq-admin(.bat) start.

These are the general steps that are required to run the following examples. Additional
configuration options will be presented under each section if necessary.

Scenario 1 - A generic JMS client is listening on a temporary queue for a
response to a message sent to WSO2 ESB

The detailed scenario is as follows. There will be a JMS client which sends a message to
a JMS queue on a JMS broker. WSO2 ESB will pick up the message from the queue and will place
the response from the back end service in another temporary queue. The client will listen
on this temporary queue for response. When initiation the communication client create the
temporary queue and set that as the reply queue that described above and WSO2 ESB uses that
for placing the response.

Following configuration describes the proxy service configuration for ESB. It defines a proxy
which reads a message from a JMS queue.

<definitions xmlns="http://ws.apache.org/ns/synapse">
    <proxy name="StockQuoteProxy" transports="jms">
        <target>
            <endpoint>
		<address uri="https://localhost:9000/services/SimpleStockQuoteService"/>
            </endpoint>
            <outSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
		<send/>
            </outSequence>
        </target>
        <publishWSDL uri="https://localhost:9000/services/SimpleStockQuoteService?wsdl"/>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>application/xml</default>
            </rules>
        </parameter>
    </proxy>
</definitions>

Following sample code describe the generic JMS client. The code is self explanatory with
comments. Note how a temporary queue is set as the reply queue.

package samples.userguide;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.annotation.Generated;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.Properties;

public class GenericJMSClient {
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;
    private TemporaryQueue temporaryQueue;


    public static void main(String[] args) {

        String dest = getProperty("jms_dest", "dynamicQueues/JMSTextProxy");
        String type = getProperty("jms_type", "text");
        String param = getProperty("jms_payload",
                getRandom(100, 0.9, true) + " " + (int) getRandom(10000, 1.0, true) + " IBM");
        String sMsgCount = getProperty("jms_msgcount", null);

        GenericJMSClient app = new GenericJMSClient();
        int msgCount = sMsgCount == null ? 1 : Integer.parseInt(sMsgCount);
        try {
            app.connect(dest);
        } catch (Exception e) {
            System.err.println("Cloud not connect to broker : " + e.getMessage());
        }
        if ("text".equalsIgnoreCase(type)) {
            for (int i = 0; i = 0) {
            offset += numRead;
        }

        // Ensure all the bytes have been read in
        if (offset  0.5 ? 1 : -1) * varience * base * rand))
                * (onlypositive ? 1 : (rand > 0.5 ? 1 : -1));
    }

    private static String getProperty(String name, String def) {
        String result = System.getProperty(name);
        if (result == null || result.length() == 0) {
            result = def;
        }
        return result;
    }
}

Running the scenario 1

  1. Enable JMS transport sender in $ESB_HOME/repository/conf/axis2.xml. Sample
    configurations are available in sample config that is attached in the resource section.
  2. Deploy the SimpleStockQuoteService as the back end service. A newcomer is encouraged to
    read the ESB quick start guide to see how to deploy the SimpleStockQuote service[6].
  3. Replace the client in
    $ESB_HOME/samples/axis2Client/src/samples/userguide/GenericJMSClient.java with the
    attached client.
  4. Once WSO2 ESB is started with the particular configuration run the client using the command;

    ant jmsclient -Djms_type=pox -Djms_dest=dynamicQueues/StockQuoteProxy -Djms_payload=MSFT

The sample JMS client will read the response via a JMS temporary queue. This demonstrate
how the WSO2 ESB can send the response to a temporary queue read as a JMS message
header. A user can use a pre-defined queue instead of a temporary queue by simple fixing the
JMS client.

Scenario 2 - WSO2 ESB is listening on a temporary queue for a response to
a message sent to a backend service

In this scenario a general proxy service that is exposed on http will send a message to a
JMS destination. The message is two way. So ESB will block and wait for a response from the
back end. The back end service will respond to a temporary queue (or to a pre-defined queue)
on which that ESB is listening on. Once ESB receives the response over JMS that will be delivered
into the client.

Required sample configurations are available in the sample distribution.

Following configuration describes the proxy service configuration. It has a JMS endpoint
which will place the message on a JMS queue and listen on a temporary queue for the response.
Note how some properties are set and remove in the configuration. This is to make sure that
the two way scenario works for JMS for this particular scenario. Depending on the JMS broker
that the users are going to use they will have to adjust(remove/add) some other properties as
well.

<definitions xmlns="http://ws.apache.org/ns/synapse">
    <proxy name="StockQuoteProxy" transports="http">
        <target>
            <endpoint>
                <address
                      uri="jms:/SimpleStockQuoteService?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue"/>
            </endpoint>
            <inSequence>
                <property action="set" name="transport.jms.ContentTypeProperty" value="Content-Type" scope="axis2"/>
            </inSequence>
            <outSequence>
                <property action="remove" name="TRANSPORT_HEADERS" scope="axis2"/>
                <send/>
            </outSequence>
        </target>
        <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
    </proxy>
</definitions>

Running the scenario 2

  1. Enable JMS transport sender in $ESB_HOME/repository/conf/axis2.xml.
  2. Enable JMS transport receiver in $ESB_HOME/samples/axis2Server/repository/conf/axis2.xml
  3. Deploy the SimpleStockQuoteService as the back end service
  4. Once WSO2 ESB is started with the particular configuration invoke the transitional
    http client with the command;

    ant stockquote -Daddurl=https://localhost:8280/services/StockQuoteProxy -Dsymbol=MSFT

Scenario 3 - A generic JMS client is listening on a temporary queue for
a response to a message sent to ESB and ESB is listening on a temporary queue for a
response to a message sent to a backend service

In this scenario a JMS client will send a message to a JMS queue and will wait for a response
that comes to a temporary queue. WSO2 ESB will pick the message from the JMS queue and will send
the message to backend server over JMS and will start to wait for the response on a temporary
queue. Back end server will pick the message from the JMS queue and will place the response on
the temporary queue that WSO2 ESB is listening on. Once the message is received at ESB, it will
place the message on the temporary queue that the JMS client is listening on. Finally the JMS
client will read the response from the temporary queue.

The configuration
for the ESB(the proxy service) is given below. There will be a proxy which listen on a queue
for a message and which sends that message to a another JMS queue, from which the backend
service pick the message. Note how the proxy service doesn't have any WSDL attached. This
to ensure that any message sent to the JMS queue is picked by WSO2 ESB.

<definitions xmlns="http://ws.apache.org/ns/synapse">
    <proxy name="StockQuoteProxy" transports="jms">
        <target>
            <endpoint>
		<address uri="jms:/SimpleStockQuoteService?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=queue"/>
            </endpoint>
            <outSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
		<send/>
            </outSequence>
        </target>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>application/xml</default>
            </rules>
        </parameter>
    </proxy>
</definitions>

In this scenario the a slightly modified version of the JMS client presented in scenario 1
can be used. Now we need to place the entire payload as the JMS string message.

package samples.userguide;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.annotation.Generated;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.Properties;

public class GenericJMSClient {
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;
    private TemporaryQueue temporaryQueue;


    public static void main(String[] args) {

        String dest = getProperty("jms_dest", "dynamicQueues/JMSTextProxy");
        String type = getProperty("jms_type", "text");
        String param = getProperty("jms_payload",
                getRandom(100, 0.9, true) + " " + (int) getRandom(10000, 1.0, true) + " IBM");
        String sMsgCount = getProperty("jms_msgcount", null);

        GenericJMSClient app = new GenericJMSClient();
        int msgCount = sMsgCount == null ? 1 : Integer.parseInt(sMsgCount);
        try {
            app.connect(dest);
        } catch (Exception e) {
            System.err.println("Cloud not connect to broker : " + e.getMessage());
        }
        if ("text".equalsIgnoreCase(type)) {
            for (int i = 0; i = 0) {
            offset += numRead;
        }

        // Ensure all the bytes have been read in
        if (offset  0.5 ? 1 : -1) * varience * base * rand))
                * (onlypositive ? 1 : (rand > 0.5 ? 1 : -1));
    }

    private static String getProperty(String name, String def) {
        String result = System.getProperty(name);
        if (result == null || result.length() == 0) {
            result = def;
        }
        return result;
    }
}

Running the scenario 3

  1. Enable JMS transport sender and receiver in $ESB_HOME/repository/conf/axis2.xml. Sample
    configurations are available in sample config that is attached in the resource section.
  2. Enable JMS transport receiver in $ESB_HOME/samples/axis2Server/repository/conf/axis2.xml
  3. Deploy the SimpleStockQuoteService as the back end service.
  4. Replace the client in
    $ESB_HOME/samples/axis2Client/src/samples/userguide/GenericJMSClient.java with the
    attached client.
  5. Once WSO2 ESB is started with the particular configuration run the client using the command;

    ant jmsclient -Djms_type=pox -Djms_dest=dynamicQueues/StockQuoteProxy -Djms_payload=MSFT

Conclusion

This guide describes three complete two way(request/response semantic) JMS scenarios that
can be implemented using WSO2 ESB. Although these use cases are straight forward and easy to
implement, they will come handy when implementing a large integration in which there are
two way JMS invocations.

Future work

The guide describes possible two way JMS scenarios for queue type destination. The same concepts and
configurations can be extended for topic type destination as well.

References:


  1. https://www.oracle.com/technetwork/java/index-jsp-142945.html

  2. https://www.amqp.org/

  3. https://download.oracle.com/javaee/1.3/jms/tutorial/1_3_1-fcs/doc/prog_model.html#1023400

  4. https://wso2.org/project/esb/java/4.0.2/docs/transports/transports-catalog.html#JmsTrp

  5. https://activemq.apache.org/download.html

  6. https://wso2.org/project/esb/java/4.0.2/docs/quickstart_guide.html

Resource

Author

Rajika Kumarasiri, Senior Software Engineer, [email protected]

 

About Author

  • Rajika Kumarasiri
  • Senior Software Engineer
  • WSO2 Inc.