WSO2 ESB by Example - JMS Failover

Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.
  • By Rajika Kumarasiri
  • 21 Oct, 2011

Applies To

WSO2 ESB 4.0.0 and above
Apache ActiveMQ 5.4.1

Table of Content

Introduction

Message Oriented Middleware(MOM) plays a major role in Enterprise Application Integration(EAI).
Java Messaging Service or JMS[1] for short is a popular Java messaging API.

WSO2 ESB has a
production ready JMS transport which you can use to connect to various JMS providers. This
article describes a JMS failover scenario which you can implement using WSO2 ESB to make
sure that your JMS services will have high availability. This is a result of a real use case of WSO2 ESB.

The first section will cover the problem in detail that we are trying to solve. Then subsequent
sections will be providing complete code samples and configurations upon which the solution
is built. The material assumes some understanding of WSO2 ESB and it's self explained as much as possible.

JMS Failover

JMS still plays a major role when comes to application integration.One key point that enterprise messaging provides
is the application loose coupling. A queue data structure act as the link between two different applications.
Another important point is the asynchronous behaviour of messaging systems. The sending application does not
requires to have the receiving application to be online or vice versa. And also messaging systems are very
reliable. These factors give rice to use messaging systems extensively in enterprises.

It seems that messaging protocols such as Advance Message Queuing Protocl(AMQP) [2] also gaining the popularity.

An often requirement that comes in JMS integration is how to achieve the zero down time or high availability of the
services that are integrate through JMS. Read on to see how you can achieve this using WSO2 ESB

Use Case

This use case is a real world problem. A banking application had a legacy JMS system which keeps on sending
messages to a JMS broker. WSO2 ESB has to pick the messages and send them to back end HTTP endpoint. The legacy
JMS system will keep on sending the messages regardless of WSO2 ESB is online or offline to accept the
messages and process them. If WSO2 ESB fails that will mark a service unavailability which what we need to avoid.

Solution

WSO2 ESB's JMS transport is a polling transport. What that means is, when you use WSO2 ESB as a message consumer you
can configure that to listen on a queue(or topic) and it'll start to poll the queue(or topic) immediately. You can
configure a Proxy service to expose on JMS and this service will start to listen on queue(or topic) on the message broker.
The proxy will dispatch any messages that comes from the legacy JMS system into WSO2 ESB. This is the traditional approach
that we use when we want to poll a queue(or topic) using WSO2 ESB(See ESB documentation [3] and sample #250[4] if you are new
to WSO2 ESB). But this approach has a drawback when comes to achieve zero downtime of back end service. Since WSO2 ESB
act as the bridge which connect JMS message broker to back end HTTP service it's important to keep that link live. If WSO2 ESB
fails that will affect the service availability. How to make sure all the messages are delivered to backend HTTP
service. In other word how to achieve some kind of JMS failover mechanism at WSO2 ESB level?

Having JMS here has an added advantage. If the WSO2 ESB link fails still the massages will be persisted in the
JMS broker so that they can process later. But that will not fix our problem. What will happen if the WSO2 ESB link fails?
With no doubt unprocessed messages will be accumulated at the JMS broker, but backend service will be unavailable until the WSO2
ESB become alive. The suggested solution is as follows and the following diagram shows the solution graphically.

Figure1: JMS failover at WSO2 ESB level.

There are two instance of WSO2 ESB running which has the same JMS proxy
deployed (assume instance 1 running proxy1 and instance 2 running proxy2) but one of the proxies will be
initially in inactive state(proxy2). So only the active Proxy(proxy1) will
process the messages as usual. Now assume due to some reason proxy1 fails(server crashed). WSO2 ESB instance2(which has
the proxy2 deployed) will detect that proxy1 is not any more and will mark the proxy2 as active. So now
the unprocessed messages will be processed again.

A separate scheduled task that runs on ESB instance 2(which has the proxy2 deployed) will send heart beat messages to ESB instance 2 to check the service availability.
If the scheduled task detects that the proxy1 is gone, task will mark the inactive proxy2 as active. From this point proxy2 will
re-establish the link between the broker and the backend HTTP service. Depending on your requirement to make the proxy1 the primary
processing proxy, you can configure the ESB instance 2 to turn on proxy1 and go offline itself. But for simplicity we'll consider
that after a failure proxy2 becomes the primary processing proxy.

The scheduled task gives the flexibility to configure the heart beat duration. The task trigger time
can be defined using a famous cron job format giving you the full flexibility.

JMS failover using WSO2 ESB's JMS Transport

This section describes how the solution was implemented. It also provides the full working configurations and code examples that can be
can use to try out the scenario yourself.

The JMS Proxy Configuration

This is a conventional JMS proxy that can be used with your enterprise JMS provider. This configuration very similar to the
proxy configuration given in sample#250[4]. This guide will use ActiveMQ[5] as the JMS provider.Following configuration
describes the JMS proxy configuration.

<definitions xmlns="http://ws.apache.org/ns/synapse">
    <proxy name="StockQuoteProxy" transports="jms">
        <target>
            <inSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
            </inSequence>
            <endpoint>
                <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
            </endpoint>
            <outSequence>
                <send/>
            </outSequence>
        </target>
        <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>application/xml</default>
            </rules>
        </parameter>
    </proxy>
</definitions>

The JMS connection factory configuration in axis2.xml. The axis2.xml can be found at $ESB_HOME/repository/conf/axis2.xml

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
        <parameter name="myTopicConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
        </parameter>

        <parameter name="myQueueConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>

        <parameter name="default" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
        </parameter>
</transportReceiver>
        

The Java Logic to Deactivate the Proxy

This logic will be written as a task deceleration class for ESB instance 2. This will be used as a part of the scheduled task configuration
which get triggered according to the defined time interval. The logic is quite simple. When triggers, it will check for service
health using a simple and straight forward technique. It'll look for the proxy service url and try access it. Based on the outcome of
that attempt the health of the proxy will be determined. Based on that result an authenticated client will mark the local proxy
(in this case proxy2) as active. To mark a service active we can use the API provided by the ServiceAdminStub[6]. This class
comes with each distribution of ESB so no additional jars need to be added into the class path. The task class is commented well so that
it can be read and understand well. The class depends on external parameters for providing the username, password and the url of the remote
server so that it can logged in and deactivate the service. The default key stores and trust stores distribute by WSO2 products will be used.

package esb;

import org.apache.axis2.AxisFault;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.synapse.task.Task;
import org.wso2.carbon.authenticator.stub.AuthenticationAdminStub;
import org.wso2.carbon.authenticator.stub.LoginAuthenticationExceptionException;
import org.wso2.carbon.service.mgt.stub.ServiceAdminStub;
import org.wso2.carbon.service.mgt.stub.types.carbon.ServiceMetaData;
import org.wso2.carbon.utils.NetworkUtils;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.rmi.RemoteException;

/**
 * This code is only for demonstration purposes. It lack of proper logging and exception handling
 */
public class JMSProxyActivateTask implements Task {

    private String userName = "admin";

    private String passWord = "admin";

    private String host = "localhost";

    private String localPort = "9444";

    private String remotePort = "9443";

    private String serviceName = "StockQuoteProxy";

    private volatile boolean isServiceAlreadyActiaved = false;

    // Initialize the key stores and trust stores
    static {
        System.setProperty("javax.net.ssl.trustStore",
                "/home/rajika/docs/ot-articles/jms-failover/instance1/wso2esb-4.0.0/repository/" +
                        "resources/security/wso2carbon.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        System.setProperty("javax.net.ssl.trustStoreType", "JKS");
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassWord() {
        return passWord;
    }

    public void setPassWord(String passWord) {
        this.passWord = passWord;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getLocalPort() {
        return localPort;
    }

    public void setLocalPort(String localPort) {
        this.localPort = localPort;
    }

    public String getRemotePort() {
        return remotePort;
    }

    public void setRemotePort(String remotePort) {
        this.remotePort = remotePort;
    }

    public void execute() {
        String autenticateAdminUrl = "https://" + host + ":" + localPort +
                "/services/AuthenticationAdmin";
        String serivceMgtAdminUrl = "https://" + host + ":" + localPort + "/services/ServiceAdmin";

        if (!isServiceAlreadyActiaved) {
            if (!isServerAlive(host, Integer.parseInt(remotePort))) {
                // remote server has gone down, do the needful
                System.out.println("Server is dead!, the local proxy '" + serviceName + "' will be "
                        + "activated");
                String cookie;
                try {
                    cookie = getSessionCookie(autenticateAdminUrl, userName, passWord);
                    changeServiceState(serivceMgtAdminUrl, serviceName, cookie);
                    isServiceAlreadyActiaved = true;
                } catch (Exception e) {
                    System.err.println("Cloud not activate the service ' " + serviceName + "', "
                            + e.getMessage());
                }
            }
        }
    }

    /**
     * Detects if the server is alive
     * @param host remote host
     * @param port remote remotePort
     * @return flag describing server health
     */
    private static boolean isServerAlive(String host, int port) {
        Socket s = null;
        boolean isAlive = true;
        try {
            s = new Socket(host, port);
        } catch (IOException e) {
            isAlive = false;
        } finally {
            if (s != null) {
                try {
                    s.close();
                } catch (IOException e) {
                    // ignore
                }
            }
        }
        return isAlive;
    }

    /**
     * @param serviceURL AuthenticationAdmin service URL
     * @param userName username
     * @param passWord password
     * @throws LoginAuthenticationExceptionException in an error
     * @throws RemoteException in an error
     * @throws SocketException in an error
     * @return String the session cookie
     */
    private static String getSessionCookie(String serviceURL, String userName, String passWord)
            throws LoginAuthenticationExceptionException, RemoteException, SocketException {
        try {
                AuthenticationAdminStub autheticationAdminStub = new AuthenticationAdminStub(serviceURL);
            boolean isLogged = autheticationAdminStub.login(userName, passWord,
                    NetworkUtils.getLocalHostname());
            ServiceContext serivceContext = autheticationAdminStub._getServiceClient().
                    getLastOperationContext().getServiceContext();
            return (String) serivceContext.getProperty(HTTPConstants.COOKIE_STRING);
        } catch (AxisFault axisFault) {
            throw new AxisFault(axisFault.getMessage());
        }
    }

    /**
     * @param serivceMgtAdminurl The admin service URL of service management admin service
     * @param serviceName        The service name
     * @param cookie             The password cookie
     * @throws AxisFault in case of an error
     */
    private static void changeServiceState(String serivceMgtAdminurl,
                                           String serviceName,
                                           String cookie) throws AxisFault {
        try {
            // generated code from https://localhost:9443/services/ServiceAdmin?wsdl
            ServiceAdminStub serviceAdminStub = new ServiceAdminStub(serivceMgtAdminurl);
            ServiceClient serviceClient = serviceAdminStub._getServiceClient();

            Options option = serviceClient.getOptions();
            option.setManageSession(true);
            option.setProperty(HTTPConstants.COOKIE_STRING, cookie);

            ServiceMetaData serviceMetaData = serviceAdminStub.getServiceData(serviceName);
            if (!serviceMetaData.getActive()) {
                serviceAdminStub.changeServiceState(serviceName, true);
            }
        } catch (RemoteException e) {
            throw new AxisFault(e.getMessage());
        }
    }
}

The task configuration

Following is the task configuration. The task configuration has parameters to provide the username, password and the url of the remote server.
For the illustration purposes the default credentials are assumed. This need to fix according to the enterprise requirement. A
more suitable way for this would be to read the password entry from a property file and compare that against a stored password and use that
for authentication. This can be improved to provide custom keystore/truststore and passwords according to the requirements.

<task class="esb.JMSProxyActivateTask" name="JMSProxyActivateTask">
     <property name="userName" value="admin"/>
     <property name="passWord" value="admin"/>
     <property name="host" value="localhost"/>
     <property name="localPort" value="9444"/>
     <property name="remotePort" value="9443"/>
     <trigger interval="5"/>
</task>
       

Full Configuration for ESB Instance 2

This configuration consists of the JMS proxy definition which act as the back for proxy1
and the task definition which check the health of the instance2.

 <definitions xmlns="http://ws.apache.org/ns/synapse">
    <task class="esb.JMSProxyActivateTask" name="JMSProxyActivateTask">
        <property name="userName" value="admin"/>
        <property name="passWord" value="admin"/>
        <property name="host" value="localhost"/>
        <property name="localPort" value="9444"/>
        <property name="remotePort" value="9443"/>
        <trigger interval="5"/>
   </task>

   <proxy name="StockQuoteProxy" transports="jms">
        <target>
            <inSequence>
                <property action="set" name="OUT_ONLY" value="true"/>
            </inSequence>
            <endpoint>
                <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
            </endpoint>
            <outSequence>
                <send/>
            </outSequence>
        </target>
        <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>application/xml</default>
            </rules>
        </parameter>
   </proxy>
</definitions>

Deployment

This section describes how you can deploy the previously defined various configurations so that you can run the solution and see. Since we are
trying to achieve failover behaviour at the ESB layer we need to make sure we deploy the two ESB instances in two physically separated
hardware. This will ensure us that if one fails there is no chance to fail both of them at the same time. If we deploy the two instances of
ESB on the same hardware, there is a possibility of crashing one ESB service if other crashes. To avoid this we need to deploy on two physically
separated hardware. Full sample configurations and the binary version of the task is given in the resource section.

Step 1 - Start the ActiveMQ server and this will act as our JMS broker.

Step 2 - Copy the ActiveMQ client jars(activemq-core-VERSION.jar, geranimo-j2ee-management_VERSION.jar) into
$ESB_HOME/lib/api of each ESB instance. Also enable JMS transport receiver for each instance as given above. Start the ESB with the JMS
proxy configuration. This instance is the instance 1.

Step 3 - Create a jar with the task class deceleration and place it into $ESB_HOME/repository/components/lib of the second
ESB instance. This is the instance 2. Following build file can be used to build the jar.

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.wso2.carbon</groupId>
    <artifactId>jms-task</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>bundle</packaging>

    <name>org.wso2.carbon</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.synapse</groupId>
            <artifactId>synapse-core</artifactId>
            <version>2.1.0-wso2v2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.axis2.wso2</groupId>
            <artifactId>axis2</artifactId>
            <version>1.6.1.wso2v1</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.carbon</groupId>
            <artifactId>org.wso2.carbon.authenticator.stub</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.carbon</groupId>
            <artifactId>org.wso2.carbon.service.mgt.stub</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.wso2.carbon</groupId>
            <artifactId>org.wso2.carbon.core</artifactId>
            <version>3.2.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.felix</groupId>
                <artifactId>maven-bundle-plugin</artifactId>
                <version>1.4.0</version>
                <extensions>true</extensions>
                <configuration>
                    <instructions>
                        <Export-Package>
                            esb.*,
                            org.wso2.carbon.authenticator.stub.*,
                            org.wso2.carbon.service.mgt.stub.*
                        </Export-Package>
                        <Import-Package>
                            !esb.*,
                            *;resolution:=optional
                        </Import-Package>
                    </instructions>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
       

Start instance 2 with the proxy and the task configuration. Then logged into the management
console and make the proxy(which is proxy2)as deactivate. This can be done using the service dashboard.This is to make sure
that the second processing proxy is in inactive state initially. The solution doesn't need to both proxies alive at the same time.
You can also see the following log when you deactivate the JMS proxy.

JMSListener Stopped listening for JMS messages to service : StockQuoteProxy

Step 4 - Send some JMS message to the broker using the following command. You need to switch your working directory int
$ESB_HOME/samples/axis2Client folder. Note how proxy1 process the messages.

ant jmsclient -Djms_type=pox -Djms_dest=dynamicQueues/StockQuoteProxy -Djms_payload=MSFT

Step 4 - Kill the ESB instance 1 instantly. This is to illustrate a server crashed instance of instance 1 ESB.
You can use the kill command to send a SIGTERM to the instance 2 ESB process

kill - 9 ESB-instance-1-process-id

Step 5 - Now check the esb instance 2 console. Depending on the cron job task duration you can see that the JMS proxy in
instance 2 become active and start to process the messages. You can see the following log entries as well.

INFO - JMSListener Started to listen on destination : StockQuoteProxy of type queue for service StockQuoteProxy

Conclusion

As shown, upon instance 1 crashed the unprocessed messages are picked by instance 2. This shows how you can achieve JMS failover
at ESB layer.

Future Work

Although the proposed solution provides the solution that we need ( that is to make sure there are not any unprocessed messages)
due to a server crashed the proposed solution has some limitations. Upon a server crashed proxy2 becomes the primary processing
proxy that may not be the case. We can add additional configurations to shutdown the proxy2 once the proxy1 become
available. Another improvement would be to start the instance 2 proxy with inactive state initially. These improvements will make the
system more automatic and will continue to function without any human interactions.

Resources

References:


  1. http://www.oracle.com/technetwork/java/index-jsp-142945.html
  2. http://www.amqp.org/
  3. https://wso2.com/project/esb/java/4.0.0/docs/

  4. https://wso2.com/project/esb/java/4.0.0/docs/samples/transport_samples.html#Sample250

  5. http://activemq.apache.org/download.html

  6. https://svn.wso2.org/repos/wso2/branches/carbon/3.2.0/service-stubs/org.wso2.carbon.service.mgt.stub/3.2.0/src/main/resources/ServiceAdmin.wsdl

Author

Rajika Kumarasiri, Senior Software Engineer, WSO2 Inc

About Author

  • Rajika Kumarasiri
  • Senior Software Engineer
  • WSO2 Inc.