Configuring WSO2 ESB with WSO2 Message Broker
- Hasitha Abeykoon
- Associate Technical Lead - WSO2
Applies to
WSO2 Message Broker | 2.0.0 |
WSO2 ESB | 4.5.0 |
Content
- Introduction
- Use Cases
- Configure WSO2 Message Broker
- Configure WSO2 Enterprise Service Bus
- Routing Messages to Message Broker
- Receiving JMS Messages
- Conclusion
- References
Introduction
WSO2 Enterprise Service Bus (ESB) is a lightweight and high performing enterprise service bus which is recognized for its performance, user friendliness and first class support for enterprise application integration patterns.
With its new version, WSO2 Message Broker provides high speed messaging with its internal Message Store as Apache Cassandra.
As it can be clustered, WSO2 Message Broker can cope with higher messaging demands with scaling capabilities.
This article is mainly a step by step guide on how to configure WSO2 Message Broker 2.0.0 [1] with WSO2 ESB 4.5.0 [2].
Configure WSO2 Message Broker
The WSO2 Message Broker needs some prerequisites to run on a system. Please see
WSO2 Message Broker Installation Prerequisites
In this guide we are going to start Message Broker in standalone mode using inbuilt Cassandra Message Store (alternatively you can
point to an external Cassandra instance). As ESB is also going to run on the same machine, Message Broker should be configured
to start with a port offset.
-
Configure Message Broker to Start with Port Offset
Navigate to [MB_HOME]/repository/conf folder and edit following line in carbon.xml file to have offset "1".
<Ports> <!-- Ports offset. This entry will set the value of the ports defined below to the define value + Offset. e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445 --> <Offset>1</Offset>
Now the JMS broker will be available on port "5673".
-
WSO2 Message Broker uses Cassandra Database as internal message store. When Message Broker server is started with a port offset "1", the default
available port of internal Cassandra instance (9160) will be shifted by defined port offset. Thus message store configuration
of Message Broker should point to port 9161.Navigate to [MB_HOME]/repository/conf/advanced folder and edit qpid-virtualhosts.xml file as following.
<virtualhost> <name>carbon</name> <carbon> <store> <class>org.wso2.andes.server.store.CassandraMessageStore</class> <username>admin</username> <password>admin</password> <cluster>ClusterOne</cluster> <idGenerator>org.wso2.andes.server.cluster.coordination.TimeStampBasedMessageIdGenerator</idGenerator> <connectionString>localhost:9161</connectionString> </store>
Now start WSO2 Message Broker by navigating to [MB_HOME]/bin and running wso2server.sh (or for Windows wso2server.bat) file.
Configure WSO2 Enterprise Service Bus
WSO2 ESB needs some libraries to communicate with WSO2 Message Broker. They are available in [MB_HOME]/client-lib folder. Following libraries should be
copied to [ESB_HOME]/repository/components/lib folder.
- andes-client-0.13.wso2v3
- geronimo-jms_1.1_spec-1.1.0.wso2v1
There are configurations needed to enable JMS messaging from ESB side. Proxy services can be written to route messages to JMS
queues and topics, and to listen to JMS queues and topics. From here steps are described to achieve those.
Routing Messages to Message Broker
ESB can be used to route messages sent to it, into Message queues or topics in Message Broker. It has flexible configurations so any Broker
supporting JMS can be configured. In Axis level JMS Transport Sender should be enabled to send messages using JMS. We will use a common configuration for both routing messages to queues and routing messages to topics at Axis2 level and specify to which message should be routed at the service level.
- Uncomment JMS TransportSender under Transport Outs on [ESB_HOME]/repository/conf/axis2/axis2.xml file, and edit it as follows.
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"> <parameter name="default" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter> </parameter> </transportSender>
- Define what "ConnectionFactory" is at [ESB_HOME]/repository/conf/jndi.properties file.
connectionfactory.ConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
Note that if you want to use either queues or topics <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> or <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> can be added under "default" parameter above. At ESB service level default will be always picked up. Then at service level only queue name or topic name is wanted, which is more cleaner.
Sending a Message to a Queue
Note that before sending messages to a queue, the queue should already be created, and also at least one subscription should exist
for that queue. Both above requirements can be fulfilled by making a JMS subscription to that queue. For that external JMS client [3]or as described
under Receiving a Message from a Queue JMS listener Proxy should run prior to this.
- JNDI.properties
The queues are needed to be created should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
we will specify a queue called "myQueue" as below.# queue.[jndiName] = [physicalName] queue.myQueue = myQueue
For ease of explaining same jndiname is used as the physical queue name. JNDI name need not be same as the physical queue name. When
queue is referred the given JNDI name in this file should be used. - Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
- Following proxy will route an incoming message to the proxy into JMS queue. We do not expect a response here.
- Using SOAP UI send following message to the above proxy.
- The message count for the queue will increase for the queue 'myQueue' in Management Console of Message Broker. A JMS client can also be written to consume
message from the queue [3].
<proxy name="StockQuoteProxy" transports="http" startOnLoad="true"> <target> <endpoint> <address uri="jms:/myQueue?&transport.jms.DestinationType=queue"/> </endpoint> <inSequence> <log level="custom"> <property name="STATE" value="message is sent to queue"/> </log> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> </inSequence> <outSequence/> </target> </proxy>
<soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="https://services.samples" xmlns:xsd="https://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <xsd:symbol>IBM</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
Sending a Message to a Topic
Note that before sending messages to a topic, at least one subscription for the topic should be available to the Message Broker for that topic, otherwise messages will be dropped as there are no routes for the message. Above requirements can be fulfilled by making a JMS subscription to that topic. For that external JMS client [3] or as described under Receiving a Message from a Topic JMS listener Proxy should run prior to this. A typical JMS Receiver is presented at section 5.
- JNDI.properties
The topics are needed to be created should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
we will specify a topic called "myTopic" as below.# topic.[jndiName] = [physicalName] topic.myTopic = myTopic
For ease of explaining same jndiname is used as the physical topic name. JNDI name need not be same as the physical topic name. When
topic is referred the given JNDI name in this file should be used. - Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
- Following proxy will route an incoming message to the proxy to a JMS topic. We do not expect a response here.
- Using SOAP UI send following message to the above proxy.
- To verify the scenario a JMS Listener Client or as described below Receiving a Message from a Topic JMS Topic listener Proxy
should be used. A sample JMS client for receiving messages can be written as follows. If a few instances of this client is run prior to sending the message
to the topic, they all will receive the same copy of the message as they all are listening to the same topic 'myTopic'.
<proxy name="StockQuoteProxy" transports="http" startOnLoad="true"> <target> <endpoint> <address uri="jms:/myTopic?&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <log level="custom"> <property name="STATE" value="message is sent to queue"/> </log> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> </inSequence> <outSequence/> </target> </proxy>
<soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="https://services.samples" xmlns:xsd="https://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <xsd:symbol>IBM</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
/* * Copyright (c) 2005-2010, WSO2 Inc. (https://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicSubscriber { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_BROKER_PORT = "5673"; String topicName = "myTopic"; public static void main(String[] args) throws NamingException, JMSException { TopicSubscriber topicSubscriber = new TopicSubscriber(); topicSubscriber.subscribe(); } public void subscribe() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME); TopicConnection topicConnection = connFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Topic topic = topicSession.createTopic(topicName); javax.jms.TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); Message message = topicSubscriber.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("textMessage.getText() = " + textMessage.getText()); } topicSession.close(); topicConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_BROKER_PORT).append("'") .toString(); } }
Receiving JMS Messages
WSO2 ESB can be used to listen to a queue or a topic available on WSO2 Message Broker. It has flexible configurations so any Broker
supporting JMS can be configured. In Axis level JMS Transport Receiver should be enabled to receive messages using JMS. Uncomment JMS TransportReceiver part
specific to WSO2 Message Broker 2.x.x. Un-comment section under Transport Ins(Listeners) on [ESB_HOME]/repository/conf/axis2/axis2.xml file, and edit it as follows.
- If you want to listen for messages in a queue
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> <parameter name="myQueueConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> </transportReceiver>
- If you want to listen for messages in a topic
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> <parameter name="myTopicConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> </parameter> </transportReceiver>
Note that if you want to use only queues or topics it can be set as "default" instead of using "myQueueConnectionFactory" and "myTopicConnectionFactory"
to distinguish them. At ESB service level, default will be always picked up. In default axis2.xml file in ESB, a configuration similar to "myQueueConnectionFactory"
is set as default.
Above configuration uses jndi.properties file. It also needs appropriate modifications. Edit [ESB_HOME]/repository/conf/jndi.properties file to
point to the running Message Broker by editing following line.
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673' connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
Note that QueueConnectionFactory and TopicConnectionFactory are used by ESB axis2 configurations above.
Receiving a Message from a Queue
Make sure you have messages in the queue that the listener is set to, prior to running the following configurations. To send messages to the queue a JMS
client or a Sending a Message to a Queue can be used.
- JNDI.properties
The queues that are needed to be listened to should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
we will specify a queue called "myQueue" as below.# queue.[jndiName] = [physicalName] queue.myQueue = myQueue
- Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
- Following proxy will listen to the JMS queue specified by "transport.jms.Destination" parameter.
<proxy name="JMSQueueListenerProxy" transports="jms"> <target> <inSequence> <property action="set" name="OUT_ONLY" value="true"/> <log level="full"/> <drop/> </inSequence> <outSequence> </outSequence> </target> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/xml</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter> <parameter name="transport.jms.Destination">myQueue</parameter> <parameter name="transport.jms.DestinationType">queue</parameter> </proxy>
This proxy is configured to use JMS transport. (transports="jms"). "transport.jms.ContentType" specifies
that the message received is a SOAP message and there is no need to wrap it by SOAP envelope etc. As soon as a message is available on the
queue "myQueue" this proxy will receive the message and acknowledge WSO2 MB, so that it will remove the message from the queue. As this is
a sample configuration, proxy will log the message and drop it. But in a real scenario, it can be sent to another queue, route to an endpoint looking at
a parameter in message body, or deliver to a service pre-specified in headers.
Receiving a Message from a Topic
Make sure you have messages being published to the topic the listener is set, prior to running the following configurations. To send messages to the topic a JMS
client or a Sending a Message to a Topic can be used.
- JNDI.properties
The topics that are needed to be listened to should be specified in [ESB_HOME]/repository/conf/jndi.properties file. In this example
we will specify a queue called "myTopic" as below.# queue.[jndiName] = [physicalName] topic.myTopic = myTopic
- Start WSO2 ESB running wso2server.sh file (or wso2server.bat file for Windows OS).
- Following proxy will listen to the JMS topic specified by "transport.jms.Destination" parameter.
<proxy name="JMSQueueListenerProxy" transports="jms"> <target> <inSequence> <property action="set" name="OUT_ONLY" value="true"/> <log level="full"/> <drop/> </inSequence> <outSequence> </outSequence> </target> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/xml</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> <parameter name="transport.jms.Destination">myTopic</parameter> <parameter name="transport.jms.DestinationType">topic</parameter> </proxy>
As this is a sample configuration, proxy will log the message and drop it. But in a real scenario, it can be sent to another queue, route to an endpoint looking at
a parameter in message body, or deliver to a service pre-specified in headers.
Conclusion
WSO2 ESB and WSO2 Message Broker can easily be configured together to implement useful messaging patterns. In this article we discussed how to route messages
to different queues and topics of WSO2 Message Broker, and also how to configure the ESB to listen to queues and topics. These configurations are important when
implementing robust messaging with recover-ability of messages in some failure during transactions.
References
Author
Hasitha Abeykoon, Software Engineer, WSO2 inc.