2013/03/21
21 Mar, 2013

Configuring WSO2 ESB with WSO2 Message Broker

  • Hasitha Abeykoon
  • Associate Technical Lead - WSO2

Applies to

WSO2 Message Broker 2.0.0
WSO2 ESB 4.5.0

Content

  1. Introduction
  2. Use Cases
  3. Configure WSO2 Message Broker
    1. Configure Message Broker to Start with Port Offset
    2. Configure Message Store
  4. Configure WSO2 Enterprise Service Bus
  5. Routing Messages to Message Broker
  6. Receiving JMS Messages
  7. Conclusion
  8. References

Introduction

WSO2 Enterprise Service Bus (ESB) is a lightweight and high performing enterprise service bus which is recognized for its performance, user friendliness and first class support for enterprise application integration patterns.
With its new version, WSO2 Message Broker provides high speed messaging with its internal Message Store as Apache Cassandra.
As it can be clustered, WSO2 Message Broker can cope with higher messaging demands with scaling capabilities.
This article is mainly a step by step guide on how to configure WSO2 Message Broker 2.0.0 [1] with WSO2 ESB 4.5.0 [2].

Configure WSO2 Message Broker

The WSO2 Message Broker needs some prerequisites to run on a system. Please see
WSO2 Message Broker Installation Prerequisites
In this guide we are going to start Message Broker in standalone mode using inbuilt Cassandra Message Store (alternatively you can
point to an external Cassandra instance). As ESB is also going to run on the same machine, Message Broker should be configured
to start with a port offset.

  • Configure Message Broker to Start with Port Offset

    Navigate to [MB_HOME]/repository/conf folder and edit following line in carbon.xml file to have offset "1".

                        <Ports>
    
                        <!-- Ports offset. This entry will set the value of the ports defined below to
                         the define value + Offset.
                         e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
                         -->
                        <Offset>1</Offset>
                    

    Now the JMS broker will be available on port "5673".

  • Configure Message Store

    WSO2 Message Broker uses Cassandra Database as internal message store. When Message Broker server is started with a port offset "1", the default
    available port of internal Cassandra instance (9160) will be shifted by defined port offset. Thus message store configuration
    of Message Broker should point to port 9161.

    Navigate to [MB_HOME]/repository/conf/advanced folder and edit qpid-virtualhosts.xml file as following.

                          <virtualhost>
                            <name>carbon</name>
                            <carbon>
                               <store>
                                    <class>org.wso2.andes.server.store.CassandraMessageStore</class>
                                    <username>admin</username>
                                    <password>admin</password>
                                    <cluster>ClusterOne</cluster>
                                    <idGenerator>org.wso2.andes.server.cluster.coordination.TimeStampBasedMessageIdGenerator</idGenerator>
                                    <connectionString>localhost:9161</connectionString>
                                </store>
                    

    Now start WSO2 Message Broker by navigating to [MB_HOME]/bin and running wso2server.sh (or for Windows wso2server.bat) file.

Configure WSO2 Enterprise Service Bus

WSO2 ESB needs some libraries to communicate with WSO2 Message Broker. They are available in [MB_HOME]/client-lib folder. Following libraries should be
copied to [ESB_HOME]/repository/components/lib folder.

  • andes-client-0.13.wso2v3
  • geronimo-jms_1.1_spec-1.1.0.wso2v1

There are configurations needed to enable JMS messaging from ESB side. Proxy services can be written to route messages to JMS
queues and topics, and to listen to JMS queues and topics. From here steps are described to achieve those.

Routing Messages to Message Broker

ESB can be used to route messages sent to it, into Message queues or topics in Message Broker. It has flexible configurations so any Broker
supporting JMS can be configured. In Axis level JMS Transport Sender should be enabled to send messages using JMS. We will use a common configuration for both routing messages to queues and routing messages to topics at Axis2 level and specify to which message should be routed at the service level.

  • Uncomment JMS TransportSender under Transport Outs on [ESB_HOME]/repository/conf/axis2/axis2.xml file, and edit it as follows.
                <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
                    <parameter name="default" locked="false">
                        <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                        <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
                    </parameter>
                </transportSender>
                
  • Define what "ConnectionFactory" is at [ESB_HOME]/repository/conf/jndi.properties file.
    connectionfactory.ConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
                

Note that if you want to use either queues or topics <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> or <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> can be added under "default" parameter above. At ESB service level default will be always picked up. Then at service level only queue name or topic name is wanted, which is more cleaner.

Sending a Message to a Queue

Note that before sending messages to a queue, the queue should already be created, and also at least one subscription should exist
for that queue. Both above requirements can be fulfilled by making a JMS subscription to that queue. For that external JMS client [3]or as described
under Receiving a Message from a Queue JMS listener Proxy should run prior to this.

  1. JNDI.properties

    The queues are needed to be created should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
    we will specify a queue called "myQueue" as below.

        # queue.[jndiName] = [physicalName]
        queue.myQueue = myQueue
                    

    For ease of explaining same jndiname is used as the physical queue name. JNDI name need not be same as the physical queue name. When
    queue is referred the given JNDI name in this file should be used.

  2. Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
  3. Following proxy will route an incoming message to the proxy into JMS queue. We do not expect a response here.
  4.                 <proxy name="StockQuoteProxy" transports="http" startOnLoad="true">
                      <target>
                         <endpoint>
                            <address uri="jms:/myQueue?&amp;transport.jms.DestinationType=queue"/>
                         </endpoint>
                         <inSequence>
                            <log level="custom">
                               <property name="STATE" value="message is sent to queue"/>
                            </log>
                            <property name="OUT_ONLY" value="true"/>
                            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
                         </inSequence>
                         <outSequence/>
                      </target>
                   </proxy>
                
  5. Using SOAP UI send following message to the above proxy.
  6.                <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="https://services.samples" xmlns:xsd="https://services.samples/xsd">
                       <soapenv:Header/>
                       <soapenv:Body>
                          <ser:getQuote>
                             <!--Optional:-->
                             <ser:request>
                                <!--Optional:-->
                                <xsd:symbol>IBM</xsd:symbol>
                             </ser:request>
                          </ser:getQuote>
                       </soapenv:Body>
                    </soapenv:Envelope>
                
  7. The message count for the queue will increase for the queue 'myQueue' in Management Console of Message Broker. A JMS client can also be written to consume
    message from the queue [3].

Sending a Message to a Topic

Note that before sending messages to a topic, at least one subscription for the topic should be available to the Message Broker for that topic, otherwise messages will be dropped as there are no routes for the message. Above requirements can be fulfilled by making a JMS subscription to that topic. For that external JMS client [3] or as described under Receiving a Message from a Topic JMS listener Proxy should run prior to this. A typical JMS Receiver is presented at section 5.

  1. JNDI.properties

    The topics are needed to be created should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
    we will specify a topic called "myTopic" as below.

        # topic.[jndiName] = [physicalName]
        topic.myTopic = myTopic
                            

    For ease of explaining same jndiname is used as the physical topic name. JNDI name need not be same as the physical topic name. When
    topic is referred the given JNDI name in this file should be used.

  2. Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
  3. Following proxy will route an incoming message to the proxy to a JMS topic. We do not expect a response here.
  4.                         <proxy name="StockQuoteProxy" transports="http" startOnLoad="true">
                              <target>
                                 <endpoint>
                                    <address uri="jms:/myTopic?&amp;transport.jms.DestinationType=topic"/>
                                 </endpoint>
                                 <inSequence>
                                    <log level="custom">
                                       <property name="STATE" value="message is sent to queue"/>
                                    </log>
                                    <property name="OUT_ONLY" value="true"/>
                                    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
                                 </inSequence>
                                 <outSequence/>
                              </target>
                           </proxy>
                        
  5. Using SOAP UI send following message to the above proxy.
  6.                        <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="https://services.samples" xmlns:xsd="https://services.samples/xsd">
                               <soapenv:Header/>
                               <soapenv:Body>
                                  <ser:getQuote>
                                     <!--Optional:-->
                                     <ser:request>
                                        <!--Optional:-->
                                        <xsd:symbol>IBM</xsd:symbol>
                                     </ser:request>
                                  </ser:getQuote>
                               </soapenv:Body>
                            </soapenv:Envelope>
                        
  7. To verify the scenario a JMS Listener Client or as described below Receiving a Message from a Topic JMS Topic listener Proxy
    should be used. A sample JMS client for receiving messages can be written as follows. If a few instances of this client is run prior to sending the message
    to the topic, they all will receive the same copy of the message as they all are listening to the same topic 'myTopic'.
  8.                    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (https://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    https://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
     
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
     
    public class TopicSubscriber {
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_BROKER_PORT = "5673";
        String topicName = "myTopic";
     
        public static void main(String[] args) throws NamingException, JMSException {
            TopicSubscriber topicSubscriber = new TopicSubscriber();
            topicSubscriber.subscribe();
        }
        public void subscribe() throws NamingException, JMSException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // Send message
            Topic topic = topicSession.createTopic(topicName);
            javax.jms.TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
            Message message = topicSubscriber.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("textMessage.getText() = " + textMessage.getText());
            }
            topicSession.close();
            topicConnection.close();
        }
        public String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_BROKER_PORT).append("'")
                    .toString();
        }
    }
                

Receiving JMS Messages

WSO2 ESB can be used to listen to a queue or a topic available on WSO2 Message Broker. It has flexible configurations so any Broker
supporting JMS can be configured. In Axis level JMS Transport Receiver should be enabled to receive messages using JMS. Uncomment JMS TransportReceiver part
specific to WSO2 Message Broker 2.x.x. Un-comment section under Transport Ins(Listeners) on [ESB_HOME]/repository/conf/axis2/axis2.xml file, and edit it as follows.

  • If you want to listen for messages in a queue
                    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
                        <parameter name="myQueueConnectionFactory" locked="false">
                            <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                            <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
                        </parameter>
                    </transportReceiver>
                    
  • If you want to listen for messages in a topic
                    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
                        <parameter name="myTopicConnectionFactory" locked="false">
                           <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                            <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                            <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
                            <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
                        </parameter>
                    </transportReceiver>
                    

Note that if you want to use only queues or topics it can be set as "default" instead of using "myQueueConnectionFactory" and "myTopicConnectionFactory"
to distinguish them. At ESB service level, default will be always picked up. In default axis2.xml file in ESB, a configuration similar to "myQueueConnectionFactory"
is set as default.

Above configuration uses jndi.properties file. It also needs appropriate modifications. Edit [ESB_HOME]/repository/conf/jndi.properties file to
point to the running Message Broker by editing following line.

        connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
        connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
    

Note that QueueConnectionFactory and TopicConnectionFactory are used by ESB axis2 configurations above.

Receiving a Message from a Queue

Make sure you have messages in the queue that the listener is set to, prior to running the following configurations. To send messages to the queue a JMS
client or a Sending a Message to a Queue can be used.

  1. JNDI.properties

    The queues that are needed to be listened to should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
    we will specify a queue called "myQueue" as below.

        # queue.[jndiName] = [physicalName]
        queue.myQueue = myQueue
                            
  2. Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
  3. Following proxy will listen to the JMS queue specified by "transport.jms.Destination" parameter.
                       <proxy name="JMSQueueListenerProxy" transports="jms">
                            <target>
                                <inSequence>
                                    <property action="set" name="OUT_ONLY" value="true"/>
                                    <log level="full"/>
                                    <drop/>
                                </inSequence>
                                <outSequence>
                                </outSequence>
                            </target>
                            <parameter name="transport.jms.ContentType">
                                <rules>
                                    <jmsProperty>contentType</jmsProperty>
                                    <default>application/xml</default>
                                </rules>
                            </parameter>
                            <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
                            <parameter name="transport.jms.Destination">myQueue</parameter>
                            <parameter name="transport.jms.DestinationType">queue</parameter>
                       </proxy>
                    

    This proxy is configured to use JMS transport. (transports="jms"). "transport.jms.ContentType" specifies
    that the message received is a SOAP message and there is no need to wrap it by SOAP envelope etc. As soon as a message is available on the
    queue "myQueue" this proxy will receive the message and acknowledge WSO2 MB, so that it will remove the message from the queue. As this is
    a sample configuration, proxy will log the message and drop it. But in a real scenario, it can be sent to another queue, route to an endpoint looking at
    a parameter in message body, or deliver to a service pre-specified in headers.

Receiving a Message from a Topic

Make sure you have messages being published to the topic the listener is set, prior to running the following configurations. To send messages to the topic a JMS
client or a Sending a Message to a Topic can be used.

  1. JNDI.properties

    The topics that are needed to be listened to should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
    we will specify a queue called "myTopic" as below.

        # queue.[jndiName] = [physicalName]
        topic.myTopic = myTopic
                                    
  2. Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
  3. Following proxy will listen to the JMS topic specified by "transport.jms.Destination" parameter.
                               <proxy name="JMSQueueListenerProxy" transports="jms">
                                    <target>
                                        <inSequence>
                                            <property action="set" name="OUT_ONLY" value="true"/>
                                            <log level="full"/>
                                            <drop/>
                                        </inSequence>
                                        <outSequence>
                                        </outSequence>
                                    </target>
                                    <parameter name="transport.jms.ContentType">
                                        <rules>
                                            <jmsProperty>contentType</jmsProperty>
                                            <default>application/xml</default>
                                        </rules>
                                    </parameter>
                                    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
                                    <parameter name="transport.jms.Destination">myTopic</parameter>
                                    <parameter name="transport.jms.DestinationType">topic</parameter>
                               </proxy>
                            

    As this is a sample configuration, proxy will log the message and drop it. But in a real scenario, it can be sent to another queue, route to an endpoint looking at
    a parameter in message body, or deliver to a service pre-specified in headers.

Conclusion

WSO2 ESB and WSO2 Message Broker can easily be configured together to implement useful messaging patterns. In this article we discussed how to route messages
to different queues and topics of WSO2 Message Broker, and also how to configure the ESB to listen to queues and topics. These configurations are important when
implementing robust messaging with recover-ability of messages in some failure during transactions.

References

  1. WSO2 Message Broker
  2. WSO2 Enterprise Service Bus
  3. JMS Clients Samples - WSO2 Message Broker

Author

Hasitha Abeykoon, Software Engineer, WSO2 inc.

 

About Author

  • Hasitha Abeykoon
  • Associate Technical Lead
  • WSO2