2013/12/09
9 Dec, 2013

Building an In-Order, Guaranteed Delivery Messaging System for Your Enterprise with WSO2 Products

  • Chanaka Fernando
  • Director of Solutions Architecture - WSO2

Table of contents

 

  1. Business requirement
  2. How WSO2 can serve your requirement
  3. Configuring the WSO2 products

 

Business requirement

If you are a business owner and if you need to grow your business, the best approach is to make it accessible over the internet. When your business grows, there can be thousands, millions or even billions of people interacting with your business every second. If you think about online stores like eBay, Amazon or Google play, they need to process millions of messages per second.

When a customer interacts with such a system, that information needs to be saved somewhere (e.g. a database). This has been illustrated in the diagram below.

The above diagram illustrates how a client, with the use of a web browser, accesses your system, which is implemented as a web service and hosted somewhere on the internet. At a given moment, there can be any number of users accessing your business. To handle each and every request from the customers, you need to have a robust, reliable messaging system.

How can WSO2 meet your requirement?

WSO2 is an enterprise middleware company that provides a full set of products, which can be used in enterprise integration scenarios. Mapping of WSO2 products to the above business scenario is not complex as WSO2 provides well-designed products for most of your enterprise integrations.

Re-designed diagram with the WSO2 product mapping can be illustrated as shown below.

According to the above figure, we need three WSO2 products that are specifically designed for the respective use cases.

WSO2 ESB - Ultimate integration engine that can be used to connect heterogeneous systems. It is used to host a virtual service for integrating all systems.

WSO2 MB - Reliable messaging broker that can be used to store your messages.

WSO2 DSS - Simple yet powerful engine that can be used to interact with a database operations in a reliable manner.

Let's make it happen with actual configurations

First, you need to download the three products from the WSO2 website (click on the links above)

After downloading the products, you can extract them to three directories. Let's consider these three directories as below. You need to follow the below link to install the WSO2 products in a Linux environment.

https://docs.wso2.org/display/ESB470/Installing+on+Linux

 

ESB_HOME - extracted directory for WSO2 ESB

MB_HOME - extracted directory for WSO2 MB

DSS_HOME - extracted directory for WSO2 DSS

Once you have installed the products, let's ensure the system is integrated for the business requirement.

1). Once you have extracted all three products, you need to configure them in such a way that they can work together by copying required libraries.

  • Configuring WSO2 MB

1. It is not possible to start multiple WSO2 products with their default configurations simultaneously in the same environment. Since all WSO2 products use the same port in their default configuration, there will be port conflicts. Therefore, to avoid this, apply a port offset in <MB_HOME>/repository/conf/carbon.xml file by changing the offset value to 1. For example,

                      
                        <Ports>
                        <!-- Ports offset. This entry will set the value of the ports defined below to
                        the define value + Offset.
                        e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
                        -->
                        <Offset>1</Offset>
                      
               

2. Start the Message Broker by running <MB_HOME>/bin/wso2server.sh (on Linux) or <MB_HOME>/bin/wso2server.bat (on Windows) .

  • Configuring WSO2 ESB

1. Enable the JMS transport of WSO2 ESB to communicate with the Message Broker by editing $ESB_HOME/repository/conf/axis2/axis2.xml file. Find a commented <transport receiver> block for MB 2.x.x and uncomment it.

                      
                        <!--Uncomment this and configure as appropriate for JMS transport support with WSO2 MB 2.x.x -->
                        <!--transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
                        …..
                        </parameter>
                        </transportReceiver-->
                      
                

Also, uncomment <transport sender> block for JMS in the same file as follows:

                      
                        <!-- uncomment this and configure to use connection pools for sending messages>
                        <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/→
                      
                

2. Copy the following jar files from <MB_HOME>/clent-lib folder to <ESB_HOME>/repository/components/lib folder. They are client libraries required from Message Broker to ESB.

  • andes-client-0.13.wso2v8

  • geronimo-jms_1.1_spec-1.1.0.wso2v1

3. Open <ESB_HOME>/repository/conf/ JNDI.proerties file and point to the running Message Broker.

Use ‘carbon’ as the virtualhost. Define a queue called ‘StockQuotesQueue’. Comment out the topic as it is not needed. But, in order to avoid getting ‘javax.naming.NameNotFoundException: TopicConnectionFactory’ during server startup, point 'TopicConnectionFactory' also to the Message Broker.

                      
                        # register some connection factories
                        # connectionfactory.[jndiname] = [ConnectionURL]
                        connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?
                        brokerlist='tcp://localhost:5673'
                        connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?
                        brokerlist='tcp://localhost:5673'
                        # register some queues in JNDI using the form
                        # queue.[jndiName] = [physicalName]
                        queue.StockQuotesQueue = StockQuotesQueue
                      

4. Start WSO2 ESB by running <ESB_HOME>/bin/wso2server.sh (on Linux) or <ESB_HOME>/bin/wso2server.bat (on Windows).

Now you have configured the ESB for using with the MB.

  • Configuring WSO2 DSS

1. It is not possible to start multiple WSO2 products with their default configurations simultaneously in the same environment. Since all WSO2 products use the same port in their default configuration, there will be port conflicts. Therefore, to avoid this, apply a port offset in <DSS_HOME>/repository/conf/carbon.xml file by changing the offset value to 1. For example,

                      
                        <Ports>
                        <!-- Ports offset. This entry will set the value of the ports defined below to
                        the define value + Offset.
                        e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
                        -->
                        <Offset>2</Offset>
                      
                

2. WSO2 DSS does not have a default enabled JMS transport configuration for communicating with the Message Broker. Therefore, we need to add a <transport receiver> block for MB 2.x.x by editing <DSS_HOME>/repository/conf/axis2/axis2.xml as follows:

  
    <!--Configure for JMS transport support with WSO2 MB 2.x.x -->
    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
    <parameter name="myTopicConnectionFactory" locked="false">
    <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
    <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
    <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
    <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
    </parameter>
    <parameter name="myQueueConnectionFactory" locked="false">
    <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
    <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
    <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
    <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
    <parameter name="default" locked="false">
    <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
    <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
    <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
    <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
    </transportReceiver>
  

3. Also, uncomment <transport sender> block for JMS in the same file as follows:

  
    <!-- uncomment this and configure to use connection pools for sending messages>
  
  
    <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/-->
  

4. Create and add the following jndi.properties file into the <DSS_HOME>/repository/conf/directory to point to the running Message Broker:

  
    # Copyright (c) 2011, WSO2 Inc. (https://wso2.com) All Rights Reserved.
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    # https://www.apache.org/licenses/LICENSE-2.0
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    # register some connection factories
    # connectionfactory.[jndiname] = [ConnectionURL]
    connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
    connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
    # register some queues in JNDI using the form
    queue.StockQuotesQueue = StockQuotesQueue
    # register some topics in JNDI using the form
    # topic.[jndiName] = [physicalName]
  

5. Copy the following client library JAR files from the <MB_HOME>/client-lib folder to <DSS_HOME>/repository/components/lib.

andes-client-0.13.wso2v8

geronimo-jms_1.1_spec-1.1.0.wso2v1

6. Once the JMS transport is enabled for DSS, it will reflect in all the services deployed in the Data Services Server, which can cause an unexpected error during server start up. To prevent this error, edit and add the following parameters into the <service> entry of <DSS_HOME>/repository/deployment/server/dataservices/samples/RDBMSSample_services.xml.

  
    <parameter name="transport.jms.ContentType">
    <rules>
    <jmsProperty>contentType</jmsProperty>
    <default>application/xml</default>
    </rules>
    </parameter>
  

Copy the two files RDBMSSample_services.xml and RDBMSSample.dbs in this 'samples/' directory into the <DSS_HOME>/repository/deployment/server/dataservices/ directory.

7. Save all the files and start the DSS by running <DSS_HOME>/bin/wso2server.sh (on Linux) or <DSS_HOME>/bin/wso2server.bat (on Windows).

Now you have successfully configured the WSO2 DSS server.

2). Make an interface in the ESB to access from the Web service client.

We can create a proxy service to handle the requests coming from the Web browser into ESB. The proxy configuration can be given as below.

  
    <?xml version="1.0" encoding="UTF-8"?>
    <proxy xmlns="http://ws.apache.org/ns/synapse"
    name="RequestInProxy"
    transports="https,http"
    statistics="enable"
    trace="disable"
    startOnLoad="true">
    <target>
    <inSequence>
    <property name="OUT_ONLY" value="true"/>
    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
    <property name="transportNonBlocking" scope="axis2" action="remove"/>
    <log level="custom">
    <property name="STATUS" value="Message Received"/>
    </log>
    <send>
    <endpoint>
    <address
    uri="jms:/StockQuotesQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:5673&amp;transport.jms.DestinationType=queue&amp;java.naming.provider.url=repository/conf/jndi.properties"
    format="pox"/>
    </endpoint>
    </send>
    </inSequence>
    <faultSequence>
    <log level="custom">
    <property name="ERROR" value="Error in MessageReceivingProxy - Deactivating service"/>
    </log>
    </faultSequence>
    </target>
    <description/>
    </proxy>
  

you can save this proxy in ESB_HOME/repository/deployment/server/synapse-configs/default/proxy-services directory. From the above proxy, we save the message that comes into the proxy to the JMS queue named StockQuotesProxy.

3). Once the messages are saved in the queue, we need to send them to the database with the DSS accessing the queue. This can be done by using one of the following methodologies.

  • Using the JMS endpoint in the DSS

If we are using the JMS endpoint, we need to write a data service for consuming the messages from the MB queue. This can be written as follows. First you need to configure your database with the DSS server. If you are using a MySQL database, you can follow this link to configure.

https://wso2.com/library/tutorials/create-simple-data-service-with-wso2-data-services

 

Once you have configured the database with the DSS, you can define your dataservice to retrieve the messages from the MB using JMS transport. To do this, you need the following two files in the DSS_HOME/repository/deployment/server/dataservices directory.

StockQuotesQueue_services.xml

----------------------------------------------

  
    <serviceGroup>
    <service name="StockQuotesQueue">
    <parameter name="transport.jms.ContentType">
    <rules>
    <jmsProperty>contentType</jmsProperty>
    <default>application/xml</default>
    </rules>
    </parameter>
    <parameter name="JMS_REPLY_TO">DSS_FAULTS</parameter>
    </service>
    </serviceGroup>
  

StockQuotesQueue.dbs

---------------------------------------------

  
    <data enableBatchRequests="true" name="StockQuotesQueue">
    <description>Sample Data Service</description>
    <config id="DemoDataSource">
    <property name="driverClassName">com.mysql.jdbc.Driver</property>
    <property name="url">jdbc:mysql://localhost:3306/ESB_SP_SAMPLE</property>
    <property name="username">root</property>
    <property name="password">root123</property>
    </config>
    <query id="insertRequestData" useConfig="DemoDataSource">
    <sql>Call ESB_SP_SAMPLE.InsertRequestData(?,?,?,?)</sql>
    <properties>
    <property name="org.wso2.ws.dataservice.query_timeout">100</property>
    <property name="org.wso2.ws.dataservice.force_jdbc_batch_requests">true</property>
    </properties>
    <param name="name" ordinal="1" sqlType="STRING"/>
    <param name="id" ordinal="2" sqlType="STRING"/>
    <param name="price" ordinal="3" sqlType="DOUBLE"/>
    <param name="location" ordinal="4" sqlType="STRING"/>
    </query>
    <operation name="insertRequestData" returnRequestStatus="false">
    <call-query href="insertRequestData">
    <with-param name="name" query-param="name"/>
    <with-param name="id" query-param="id"/>
    <with-param name="price" query-param="price"/>
    <with-param name="location" query-param="location"/>
    </call-query>
    </operation>
    </data>
  

In this data service, the message retrieved from the MB is written into the database using a stored procedure that is defined in the MySQL database. You can create the related database and the stored procedure with the following commands.

1. Connect to the MySQL server.

chanaka@chanaka-laptop:~$ mysql -u root -p

Enter password:

2. Create a sample database.

DROP DATABASE IF EXISTS ESB_SP_SAMPLE;

CREATE DATABASE ESB_SP_SAMPLE;

3. Create a table using the following statement.

USE ESB_SP_SAMPLE;

DROP TABLE IF EXISTS company;

CREATE TABLE company(name VARCHAR(10), id VARCHAR(10), price DOUBLE, location VARCHAR(10));

4. Inserts some data to the company table using the following statements

INSERT INTO company VALUES ('WSO2','c1',2.9563,'SL');

INSERT INTO company VALUES ('IBM','c2',3.7563,'US');

INSERT INTO company VALUES ('SUN','c3',3.8349,'US');

INSERT INTO company VALUES ('MSFT','c4',3.2938,'US');

5. Create the necessary Stored Procedures.

DROP PROCEDURE If EXISTS InsertRequestData;

CREATE PROCEDURE InsertRequestData(compName VARCHAR(10), compId VARCHAR(10), compPrice DOUBLE, compLocation VARCHAR(10)) INSERT INTO company VALUES(compName,compId,compPrice,compLocation) ;

Now we are all set and we can send a message to the proxy service using SOAP UI and we can observe that sample data is stored in the database once the message arrives at the DSS. You can change the configurations as such that the actual message content is saved in the database. Here is a sample message you can send from SOAP UI to be saved in the database.

  
    <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/">
    <soapenv:Header/>
    <soapenv:Body>
    <p:insertRequestData xmlns:p="https://ws.wso2.org/dataservice">
    <p:name>WSO2-Mob</p:name>
    <p:id>c45</p:id>
    <p:price>33.33</p:price>
    <p:location>SL</p:location>
    </p:insertRequestData>
    </soapenv:Body>
    </soapenv:Envelope>
  

Now you can see the data you have sent to the ESB is saved in the MySQL database.

  • Using a Message Processor defined in the ESB

You can achieve the same results by using a message processor defined in the ESB instead of defining a JMS data service at DSS side. For this you need to change the proxy service as such that it saves the message in message store and then use the message processor to send the message to the DSS service.

1. Define a message store by copying and pasting the following config to ESB source view. Alternatively, you can use the message-store UI.

  
    <messageStore name="JMSMS" class="org.apache.synapse.message.store.impl.jms.JmsStore" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
    <parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
    <parameter name="store.jms.destination">JMSMS</parameter>
    <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
    <parameter name="store.jms.cache.connection">false</parameter>
    </messageStore>
  

2. Define an endpoint to send the message. We need to send the message to the DSS service.

  
    <endpoint xmlns="http://ws.apache.org/ns/synapse" name="StockQuoteDataService">
    <address uri="https://localhost:9765/services/StockQuoteDataService">
    <suspendOnFailure>
    <progressionFactor>1.0</progressionFactor>
    </suspendOnFailure>
    <markForSuspension>
    <retriesBeforeSuspension>0</retriesBeforeSuspension>
    <retryDelay>0</retryDelay>
    </markForSuspension>
    </address>
    </endpoint>
  

For this you only need the dbs file in the DSS and you can change that as shown below.

StockQuoteDataService.dbs

----------------------------------------

  
    <data enableBatchRequests="true" name="StockQuoteDataService">
    <description>Sample Data Service</description>
    <config id="DemoDataSource">
    <property name="driverClassName">com.mysql.jdbc.Driver</property>
    <property name="url">jdbc:mysql://localhost:3306/ESB_SP_SAMPLE</property>
    <property name="username">root</property>
    <property name="password">root123</property>
    </config>
    <query id="insertRequestData" useConfig="DemoDataSource">
    <sql>Call ESB_SP_SAMPLE.InsertRequestData(?,?,?,?)</sql>
    <properties>
    <property name="org.wso2.ws.dataservice.query_timeout">100</property>
    <property name="org.wso2.ws.dataservice.force_jdbc_batch_requests">true</property>
    </properties>
    <param name="name" ordinal="1" sqlType="STRING"/>
    <param name="id" ordinal="2" sqlType="STRING"/>
    <param name="price" ordinal="3" sqlType="DOUBLE"/>
    <param name="location" ordinal="4" sqlType="STRING"/>
    </query>
    <operation name="insertRequestData" returnRequestStatus="false">
    <call-query href="insertRequestData">
    <with-param name="name" query-param="name"/>
    <with-param name="id" query-param="id"/>
    <with-param name="price" query-param="price"/>
    <with-param name="location" query-param="location"/>
    </call-query>
    </operation>
    </data>
  

copy this file into DSS_HOME/repository/deployment/server/dataservices/ directory.

3. Define a message forwarding processor as shown below by copying and pasting the code or using the Management Console UI in ESB.

  
    <messageProcessor name="Processor1" class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
    targetEndpoint="StockQuoteDataService" messageStore="JMSMS" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="interval">1000</parameter>
    <parameter name="client.retry.interval">1000</parameter>
    <parameter name="max.delivery.attempts">4</parameter>
    <parameter name="is.active">true</parameter>
    </messageProcessor>
  

4. Invoke the scenario using a proxy service like below.

  
    <?xml version="1.0" encoding="UTF-8"?>
    <proxy xmlns="http://ws.apache.org/ns/synapse"
    name="RequestInProxy2"
    transports="https,http"
    statistics="disable"
    trace="disable"
    startOnLoad="true">
    <target>
    <inSequence>
    <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
    <property name="OUT_ONLY" value="true"/>
    <property name="target.endpoint" value="StockQuoteDataService"/>
    <log level="full"/>
    <store messageStore="JMSMS"/>
    </inSequence>
    </target>
    <description/>
    </proxy>
  

Whenever a message comes to this proxy service, it will be stored in the JMS message store (which is in Message Broker, JMSMS queue). If a message processor is disabled and messages are sent, you will notice in the Message Broker's management console the message count of JMSMS queue being increased.

4) Now we are all set and we can send a message to the proxy service using SOAP UI and we can observe that sample data is stored in the database once the message arrives at the DSS. You can change the configurations as such that the actual message content is saved in the database.

Here is a sample message you can send from SOAP UI to be saved in the database.

  
    <soapenv:Envelope xmlns:soapenv="https://schemas.xmlsoap.org/soap/envelope/">
    <soapenv:Header/>
    <soapenv:Body>
    <p:insertRequestData xmlns:p="https://ws.wso2.org/dataservice">
    <p:name>WSO2-Mob</p:name>
    <p:id>c45</p:id>
    <p:price>33.33</p:price>
    <p:location>SL</p:location>
    </p:insertRequestData>
    </soapenv:Body>
    </soapenv:Envelope>
  

You need to set the WS-Addressing in SOAPUI and set the action as urn:insertRequestData and To header pointing to the soap service.

Author: Chanaka Fernando, Technical Lead, WSO2 Inc.

 

About Author

  • Chanaka Fernando
  • Director of Solutions Architecture
  • WSO2