2011/12/28
28 Dec, 2011

WSO2 ESB by Example - Pub/Sub in SOA

  • Rajika Kumarasiri
  • Senior Software Engineer - WSO2

Applies To

WSO2 ESB 4.0.0 and above
WSO2 AS 4.1.0 and above
WSO2 MB 1.0.2 and above

Table of Content

Introduction

Java Messaging Service API(JMS) [1] still plays a major role in Enterprise Application
Integration. It may be
the most popular messaging API still in market despite of the recently popular messaging
API such as Advance
Message Queueing Protocol(AMQP)[2]. When comes to integration with JMS there are two
most wildly used and
popular
integration patterns. They are commonly known as the producer-consumer pattern and the
publisher-subscriber
pattern. In publisher-subscriber scenario sender(called publishers) sends messages to
another party or multiple of them(called
subscribers)
who are interested about these message[3]. Although a publisher sends the message to a
subscriber they are
not
directly program to do so. Instead they use the third software component known as the
Message Oriented
Middleware (MOM)
to communicate between them. These MOM commonly known as message brokers and WSO2 Message
Broker(MB) is the
MOM
component from WSO2. WSO2 MB provides your Service Oriented Architecture(SOA) the full
capacity of a MOM
where
it provides the asynchronous message handling for offline clients(for example the
subscriber is offline
when the publisher sends the message and WSO2 MB will keep the message until the
subscriber become available
as any MOM would do), message persistence and message routing based on the message
content.

When comes to
publisher-subscriber there are thousands of use cases that can be implemented using this
pattern. For example
consider a blog and the readers who have subscribed to that. The blog author post a blog
entry which can be
seen by the subscribers of that blog. In other words the blog author publish a message
(the blog post content) and the list of subscribers ( the blog readers) receive that
message. With no doubt WSO2 MB can be used to implement these applications of the publisher-subscriber
patten.

WSO2 ESB has a production ready JMS transport which can be used to implement various
Enterprise Integration
requirements in JMS. This article describes how to implement publisher-subscriber
integration pattern using
various SOA components available as part of the WSO2 SOA stack together with WSO2 ESB.

The first section will cover the scenario briefly and how it fits when comes to SOA. Then
the subsequent
sections will describes different semantic of publisher-subscriber pattern that can be
implemented using
WSO2 MB, WSO2 ESB, WSO2 AS and JMS Java and SOAP clients. These samples will be full
working configurations
and
code examples so anybody can use them as the basis for implementing publisher-subscriber
patten. Although
you may
not used all the scenarios at once they may be the building blocks of a large
integration based on
publisher-subscriber Enterprise Application Integration (EAI) patten. The material is
self explanatory as
much as possible, a basic knowledge of WSO2 ESB
will be helpful. ESB quick start guide[4] is recommended for a new comer.

Pub/Sub in SOA

This section describes an imaginary messaging scenario that can be implemented within
your SOA. It utilizes various products from the WSO2 SOA product stacks to demonstrate
the
capabilities and configurations of each of the product in a distributed messaging
scenario.

The scenario consists of WSO2 MB, which acts as the centralized messaging hub. There are
couple of publishers which publish messages into the broker and there are couple of
subscribers
who receives messages from the broker. The list of publishers are, a proxy service
deployed on
WSO2 ESB, a generic JMS client written using JMS API and a SOAP based JMS client. And
also
the list of subscribers are consists of a proxy service deployed on WSO2 ESB, a web
service
deployed on WSO2 Application server and a generic JMS client written using JMS API. The
wide variety range of configurations options covered using various types of publishers
and
subscribers will cover most of your configuration requirements.

Figure1: A possible pub/sub scenario using WSO2 SOA stack.

Note:
The configurations are given below also can be found in the resource section so that a
user can try them out by themselves. The various
configuration files are group according to the product that is used in the scenario.
How ever in order to run multiple Carbon servers(such as WSO2 MB, WSO2 ESB and WSO2
AS) on the
same host, the offset property in $CARBON_HOME/repository/conf should be set
appropriately.

Using WSO2 MB as the centralized messaging hub

Step1: Download WSO2 MB distribution[5] and go to bin folder and start the start up
script
wso2server.sh{.bat} (depending your platform). This is what needs to be done to start
any of
the Carbon products states below. If that method differs that will be mentioned.

Using a WSO2 ESB proxy service as a subscriber

Now there is a Message Oriented Middleware infrastructure running which can be used by
the publishers and subscribers. Following section describes how to configure a WSO2 ESB's
proxy service as a subscriber. WSO2 ESB has a production ready JMS transport which has a
vast set of configuration options. Information on how to configure the WSO2 ESB JMS
transport
is available at[6].

A JMS proxy will be act as the subscriber to the broker. The destination type can be
given
as the topic and the required connection factories can be defined so that the JMS proxy
will
connect to the centralized WSO2 MB.

Following describes the proxy service configuration(which act as one of the subscribers).
This configuration also available in the sample configuration given at the resource
section. According to this configuration upon receiving a message from the topic the
proxy
will log the message and will drop it. This is just for demonstrate purposes only. Once
the
message is received by WSO2 ESB required mediation and routing logic can be applied.

 <proxy name="SimpleStockQuoteService" transports="jms">
        <target>
            <inSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
                <log level="full"/>
                <drop/>
            </inSequence>
            <outSequence>
                <send/>
            </outSequence>
        </target>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>application/xml</default>
            </rules>
        </parameter>
        <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
        <parameter name="transport.jms.Destination">SimpleStockQuoteService</parameter>
        <parameter name="transport.jms.DestinationType">topic</parameter>
</proxy>

        

Note that a connection factory by the name myTopicConnectionFactory is referred by the
parameter transport.jms.ConnectionFactory in the proxy configuration. This connection
factory is
defined in the JMS transport receiver in the axis2.xml. Also note how the destination
type is set to topic.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/esb-publish-jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.Destination" locked="false">SimpleStockQuoteService</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/esb-publish-jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
            <parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
            <parameter name="java.naming.provider.url" locked="false">repository/conf/esb-publish-jndi.properties</parameter>
            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
</transportReceiver>

        

This receiver configuration uses the jndi property file called
esb-publish-jndi.properties and this is
used by the WSO2 MB client libraries used by WSO2 ESB. The JMS client libraries that
need
to communicate with WSO2 MB, are by default distribute by each of the Carbon products so
a user does not required to copy additional libraries(the client library is located in
$CARBON_HOME/lib/core/WEB-INF/lib/qpid-client-0.11.0.VERSION.jar). This jndi property
file
has the JNDI desecration of the topic that the proxy service will be subscribed. A topic
with the name SimpleStockQuoteService will be used in the scenario.

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.SimpleStockQuoteService = SimpleStockQuoteService
queue.Echo = Echo
queue.Version = Version
        

Deploying the JMS proxy service as a subscriber

Step1: Copy axis2.xml and the esb-publish-jndi.properies to
$ESB_HOME/repository/conf

Step2: Copy synapse_sample_251.xml to $ESB_HOME/repository/samples. This has the
JMS
subscriber configuration described above.

Step3: Start WSO2 ESB with the sample subscriber configuration using
,./wso2esb-samples.sh{.bat} -sn 251

This will deploy the SimpleStockQuoteService proxy service as a subscriber to the
SimpleStockQuoteService
topic in WSO2 MB.

Using WSO2 AS as a subscriber

Let's look how we can deploy a different type of subscriber to the same topic
SimpleStockQuoteService. This time a
axis2 service called SimpleStockQuoteService will be deployed on WSO2 AS as a subscriber
to the same topic.
This service also uses the JNDI property file to subscribes to the same topic and the
required configurations are
provided in the JMS receiver section of axis2.xml. SimpleStockQuoteService service will
generate a stock quote
upon receiving a message from the topic.

Deploying the SimpleStockQuote service as subscriber

Step1: Copy SimpleStockQuoteService.aar into
$AS_HOME/repository/deployment/server/axis2services

Step2: Copy axis2.xml and as-publish-jndi.properties into $AS_HOME/repository/conf

Step3: Start WSO2 AS using, ./wso2server.sh{.bat}

A generic JMS client as a subscriber

The previous two sections describes how to configure a proxy service deployed on WSO2 ESB
and
an axis2 service
deployed on WSO2 AS to subscribed to a topic. This section describes how to
use a JMS Java subscriber to subscribes to the topic. When comes to publisher-subscriber
implementation
it's natural to use a JMS client as the subscriber. In most cases the simple client will be
wrapped in another application to hide the underline messaging infrastructure so users will
be using
the system without knowing that JMS will be used for all client-server communications.

This client will wait for any messages and will dump them to console. The client source
is
available below. It uses the various JMS connection factory definitions that is used to
communicate
with WSO2 MB and also the javax.jms.* API.

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * This is only for demonstration purposes only it lacks of proper exception handling
 */
public class WSO2MBSubscriber {

    public static void main(String[] args) {

        String topicName = "SimpleStockQuoteService";

        Properties properties = new Properties();

        TopicConnection topicConnection = null;

        properties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";
        properties.put("connectionfactory.QueueConnectionFactory", connectionString);

        try {
            InitialContext ctx = new InitialContext(properties);
            TopicConnectionFactory topicConnectionFactory =
                    (TopicConnectionFactory) ctx.lookup("QueueConnectionFactory");
            topicConnection = topicConnectionFactory.createTopicConnection();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            Topic topic = topicSession.createTopic(topicName);

            // create a topic subscriber
            TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);

            // start the connection
            topicConnection.start();

            // fix this to suite your requirement, polling in a busy loop is bad
            while (true) {
                // receive the message
                TextMessage textMessage = (TextMessage) topicSubscriber.receive();

                // process the message
                System.out.println("\n\n");
                System.out.println("received : " + textMessage.getText());
                System.out.println("\n\n");
            }


        } catch (NamingException e) {

            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (topicConnection != null) {
                try {
                    topicConnection.close();
                } catch (JMSException e) {
                    // ignore
                }
            }
        }
    }
}
        

Use the command,
ant sub -Dcarbon_home=$CARBON_HOME
to run the Java subscriber.
$CARBON_HOME should point to location of one of the Carbon servers use in the article.

A generic JMS client as a publisher

Now that we have three different types of subscribers it's time to see how we can write a
publisher to publish messages so that those will be consumed by the subscribers. The
first
type of publisher is a Java client. This generic JMS publisher will use the javax.jms.*
API
and the required JMS connection factories to communicate with WSO2 MB. The client source
is given below.

import javax.jms.*;
import javax.naming.InitialContext;
import java.util.Properties;

/**
 * This is only for demonstration purposes, it's lack of proper exception handling
 */
public class WSO2MBPublisher {
    public static void main(String[] args) {
        String topicName = "SimpleStockQuoteService";

        Properties properties = new Properties();

        TopicConnection topicConnection = null;

        properties.put("java.naming.factory.initial",
                "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";
        properties.put("connectionfactory.QueueConnectionFactory", connectionString);

        try {
            // initialize the required connection factories
            InitialContext ctx = new InitialContext(properties);
            TopicConnectionFactory topicConnectionFactory =
                    (TopicConnectionFactory) ctx.lookup("QueueConnectionFactory");
            topicConnection = topicConnectionFactory.createTopicConnection();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            // create or use the topic
            Topic topic = topicSession.createTopic(topicName);
            TopicPublisher topicPublisher = topicSession.createPublisher(topic);
            TextMessage textMessage =
                    topicSession.createTextMessage("Hello I am WSO2 MB publisher!");

            // publish the message
            topicPublisher.publish(textMessage);
            topicPublisher.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Use the command,
ant pub -Dcarbon_home=$CARBON_HOME
to run the Java publisher.
$CARBON_HOME should point to location of one of the Carbon servers use in the article.

At this point you may notice that the Java JMS subscriber has received a copy of the
published
message. Others two subscribers also received the message but they can't parse it
because the message
is not XML. We will see a proper message below for this.

A SOAP JMS client as a publisher

This is a another type of publisher that can be used to publish messages into the topic
so that other
subscribers will receive the messages. The main difference with this client and the
generic JMS client
is this client can be used to send SOAP messages using the JMS transport.

Use the command,
ant soapjmsclient -Dsymbol=IBM -DtopicName=SimpleStockQuoteService
-Dcarbon_home=$CARBON_HOME

to run the SOAP JMS publisher. $CARBON_HOME should point to location of one of the
Carbon
servers use in the article

Once you run the client you will notice each of the three
subscribers has
received a copy the message.

The client source is given below.

import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
/**
* Only for demonstration. The source is lack of proper exception handling logic
*/
public class WSO2SOAPJMSClient {
    public static void main(String[] args) {
        String jmsEndpoint = "jms:/SimpleStockQuoteService?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=client_repo/conf/esb-publish-jndi.properties&transport.jms.DestinationType=topic";

        try {
             // create the configuration context and use axis2 client API to send the message
            ConfigurationContext configurationContext =
                    ConfigurationContextFactory.createConfigurationContextFromFileSystem("client_repo",
                            "client_repo/conf/axis2.xml");
            ServiceClient serviceClient = new ServiceClient(configurationContext, null);
            serviceClient.engageModule("addressing");
            OMElement payload = createPlaceOrderRequest(123.32, 12, "IBM");
            Options options = new Options();
            options.setTo(new EndpointReference(jmsEndpoint));
            options.setAction("urn:placeOrder");
            serviceClient.setOptions(options);
            serviceClient.fireAndForget(payload);
        } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        }
    }

    private static OMElement createPlaceOrderRequest(double purchPrice, int qty, String symbol) {
        OMFactory factory = OMAbstractFactory.getOMFactory();
        OMNamespace ns = factory.createOMNamespace("https://services.samples", "m0");
        OMElement placeOrder = factory.createOMElement("placeOrder", ns);
        OMElement order = factory.createOMElement("order", ns);
        OMElement price = factory.createOMElement("price", ns);
        OMElement quantity = factory.createOMElement("quantity", ns);
        OMElement symb = factory.createOMElement("symbol", ns);
        price.setText(Double.toString(purchPrice));
        quantity.setText(Integer.toString(qty));
        symb.setText(symbol);
        order.addChild(price);
        order.addChild(quantity);
        order.addChild(symb);
        placeOrder.addChild(order);
        return placeOrder;
    }
}

Using a WSO2 ESB proxy service as a publisher

A JMS endpoint can be defined in WSO2 ESB which will act as the publisher for a proxy.
The destination type can be given as topic so that the proxy will be act as a publisher.
The JNDI configurations of topic and the connection factories will be given in the JNDI
property file. Additionally JMS transport sender has to enable in axis2.xml.

The publisher configuration is given below. It's a proxy service for WSO2 ESB.

<proxy name="StockQuoteProxy" transports="http">
        <target>
            <endpoint>
                <address
                        uri="jms:/SimpleStockQuoteService?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/esb-publish-jndi.properties&transport.jms.DestinationType=topic"/>
            </endpoint>
            <inSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
            </inSequence>
            <outSequence>
                <send/>
            </outSequence>
        </target>
</proxy>

Deploying a proxy service as a publisher

Step1: Copy the axis2.xml and esb-publish-jndi.properties into $ESB_HOME/repository/conf

Step2: Copy the proxy service configuration file synapse_sample_251.xml into $ESB_HOME/repository/samples

Step3: Start the server with the command ./wso2esb-samples.sh{.bat} -sn 251

Step4: Invoke the proxy with a sample client that comes with WSO2 ESB. Go to $ESB_HOME/samples/axis2Client and
invoke the following command.

ant stockquote -Daddurl=https://localhost:8280/services/StockQuoteProxy -Dmode=placeorder -Dsymbol=MSFT

The proxy service will publish the message to the topic, which will be received by the three subscribers.

Conclusion

The article describes a set of commons scenarios that can be used to implement publisher-subscriber pattern using various
components in WSO2 middleware stack. Although a user will not use the full scenario as it is those individual scenarios will be
used as a part of a large integration.

Future work

This article copies most of the configurations, services and other artifacts manually to run the each of the
scenario. But the user interfaces and the API available via admin services can be used to avoid manual copying of artifacts and
automate the process.

References:


  1. https://www.oracle.com/technetwork/java/index-jsp-142945.html
  2. https://www.amqp.org/

  3. https://eaipatterns.com/PublishSubscribeChannel.html

  4. https://wso2.org/project/esb/java/4.0.2/docs/quickstart_guide.html

  5. https://wso2.com/products/message-broker

  6. https://wso2.org/project/esb/java/4.0.2/docs/transports/transports-catalog.html#JmsTrp

Further Reading


  1. Messaging and Eventing in SOA

Resources

Author

Rajika Kumarasiri, Senior Software Engineer, [email protected]

 

About Author

  • Rajika Kumarasiri
  • Senior Software Engineer
  • WSO2 Inc.