WSO2Con 2013 CFP Banner

How to configure Axis2 to talk to a Qpid AMQP broker

By Suran Jaya
Date: Sun, 1st Jun, 2008
Level: Intermediate
Reads: 8964 Discuss this article on Stack Overflow

In this article by Suran Jayathilaka, he looks at configuring the JMS transport in Axis2/Java to talk with a Qpid AMQP broker.

Suran Jaya

WSO2 Inc.

Applies To

  Apache Axis2/Java 1.4
 Apache Qpid/Java  M2.1
 JDK  1.5 or higher
 Environment  Windows or Linux

 

 

 

 

Table of Contents

  1. Background
  2. Setup Qpid broker
  3. Configure Axis2
  4. Create and deploy service
  5. Create and run client
  6. Summary

1. Background

The Advanced Message Queuing Protocol (AMQP) is an application layer protocol for Message Oriented Middleware that is built on open standards. The defining features of AMQP are message orientation, queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.(source : Wikipedia). 

According to amqp.org, the AMQP specification is being developed in response to internal requirements, market demand and electronic trading needs of partners, with the objective of providing a messaging infrastructure that provides businesses with a simple and more powerful way of connecting messaging dependent applications both within and between firms. The messaging infrastructure is intended to be:

  • broadly applicable for enterprise use
  • completely open
  • platform agnostic
  • interoperable

Simply put, AMQP is an open Internet Protocol for business messaging.

As a user of Apache Axis2,  you may have the requirement of accessing a Web service hosted on Axis2 over AMQP. The open source implementation of AMQP, Apache Qpid, is fully JMS compliant and hence can be conversed with using the Axis2 JMS transport. This tutorial will take you through a very simple and straightforward example of how this can be done using JMS queues and Qpid's direct exchange, which, as its name implies, maps messages to a destination directly using a routing key. While a pure AMQP transport for Axis2 is in development, until that is fully functional, users are able to use Qpid's client libraries together with Axis2's JMS transport to effectively communicate via AMQP.

This tutorial can be followed by anyone with a little know how on Axis2 Web services. If you don't yet know how to create an Axis2 service archive (.aar), I recommend you follow the Axis2 Quickstart guide. Also, some knowledge of JMS (Java Messaging Service) would be helpful. You can download the code for the sample from the link at the end of this article.

2. Setup Qpid/Java broker

To begin, you will need to have downloaded and extracted Axis2 (the latest stable release is Axis2-1.4, which can be downloaded from here), to a directory on your local file system, which we will refer to as the AXIS2_HOME from here onwards.
For this tutorial, I will be making use of the Qpid M2.1 Java broker. Apache Qpid is an implementation of the AMQP (Advanced Message Queuing Protocol) specification and M2.1 is the latest stable release of the Qpid Java broker. Download the Qpid/Java broker M2.1 binary distribution from here and extract it to your hard drive. This folder we will call QPID_HOME. To enable Axis2 to establish a connection with Qpid, You will need to copy the following libraries from the QPID_HOME/lib folder to the ASXIS2_HOME/lib folder. If you are using a different version of the Qpid/Java broker, please substitute the correct versions of the following files:

  • geronimo-jms_1.1_spec-1.0.jar
  • qpid-client-1.0-incubating-M2.1.jar
  • qpid-common-1.0-incubating-M2.1.jar
  • slf4j-api-1.4.0.jar
  • slf4j-log4j12-1.4.0.jar
  • mina-core-1.0.1.jar
  • commons-collections-3.2.jar
  • commons-lang-2.1.jar

Now, go to QPID_HOME/bin and run qpid-server or qpid-server.bat (windows) to start the qpid broker.
Note that you can edit the QPID_HOME/etc/virtualhosts.xml to configure the queues etc. that are available on the broker. But that is out of the scope of this tutorial.

3. Configure Axis2

I will be using JNDI (Java Naming & Directory Interface) to provide the configuration parameters.
I will base the instructions that follow on using the Axis2 standalone server. If you are using Axis2 deployed in a servlet container such as Tomcat, please substitute the appropriate directory structure locations. (e.g. in Tomcat, AXIS2_HOME would be TOMCAT_HOME/webapps/axis2 and the conf folder would be in TOMCAT_HOME/webapps/axis2/WEB-INF/conf)
In the axis2.xml configuration file in AXIS2_HOME/conf folder, in the "Transport Ins" section, you will find the JMS transport receiver which would be commented out by default.
Here, you can configure the connection factories which will be used by the transport. The default configuration is for an ActiveMQ JMS broker. You should see three connection factories defined, namely "QueueConnectionFactory","TopicConnectionFactory" and "default". A service can specifiy which connection factory it wants to employ. You can easily add a configuration to use a Qpid broker as follows.
You can  either uncomment and edit the necessary parts in the axis2.xml or copy and paste the following xml fragment and change the "java.naming.provider.url" parameter according to your environment.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">

<parameter name="default">

<parameter name="java.naming.factory.initial">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>

<parameter name="java.naming.provider.url">/home/username/axis2-1.4/conf/server.properties</parameter>

<parameter name="transport.jms.ConnectionFactoryJNDIName">qpidConnectionfactory</parameter>

<parameter name="transport.jms.ConnectionFactoryType">queue</parameter>

</parameter>

</transportReceiver>


If you are using version 1.4 of Axis2, you can use the Axis2 JMS transport. i.e. org.apache.axiss2.transport.jms.JMSListener and org.apache.axis2.transport.jms.JMSSender. But if you're running a latest SNAPSHOT of Axis2 or one built from the Axis2 SVN trunk source code, you will need to use the Synapse JMS transport, which is conveniently packaged with several other transports in the synapse-transports-<version>.jar. You can either download a Synapse binary distribution (1.1.1 or later) from here and get the synapse-transports-<version>.jar from its lib folder or download that jar alone from the Maven repository. Then copy the synapse-transports-<version>.jar to the AXIS2_HOME/lib folder. After that, all you need to do is change the names of the JMSListener and JMSSender classes to org.apache.synapse.transport.jms.JMSListener and org.apache.synapse.transport.jms.JMSSender respectively.   

 

The server.properties file

#initial context factory

#java.naming.factory.initial =org.apache.qpid.jndi.PropertiesFileInitialContextFactory

# register some connection factories

# connectionfactory.[jndiname] = [ConnectionURL]

connectionfactory.qpidConnectionfactory=amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'

# Register an AMQP destination in JNDI

# destination.[jndiName] = [BindingURL]

destination.directQueue=direct://amq.direct//QpidStockQuoteService

The InitialContextfactory implementation of Qpid is PropertiesFileInitialContextFactory, which loads the properties for its initial context from a properties file. You can see that the second parameter in the configuration, namely "java.naming.provider.url" has provided a file system location from which to load the properties file. The contents of the properties file is as follows. In this file you can specify values for the connection factories, destinations and queues etc.

The connectionfactory.qpidConnectionfactory property is set to the connection URL used to lookup the connection factory and create a connection to. The destination.directQueue will be used by the service to designate which queue would be its destination. "direct://amq.direct//" denotes that the Qpid broker's direct AMQP exchange is to be used for message routing.

I have chosen to go with the "default" connection factory,  means that it would be used by default by any service that uses the JMS transport, unless, it declares another connection factory explicitly.
Also, in the Transport outs section, find the JMS transport sender and uncomment and edit it to be as listed below:
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

This is simply the class that would be responsible for sending out the service response via JMS, but via AMQP at the wire level.
That finishes the axis2 configuration.
Now, start the Axis2 server by invoking the axis2server.sh or axis2server.bat script in the AXIS2_HOME/bin folder. If all is well, that means you have successfully configured Axis2 to listen to the Qpid broker. Now stop the axis2 server by pressing Ctrl+C.

4. Create and deploy the service

Now, we will write a simple service that will accept requests via JMS. For brevity's sake I will go with a very simple service that simulates a stock quote service with very limited capability. The code for the service class is as follows:

package org.wso2.example.axis2qpid.service;
import java.util.HashMap;

public class QpidStockQuoteService {
 public double getSharePrice(String symbol) {
  Object price = getStockTable().get(symbol.toLowerCase());
  if (price == null) {
   return 0.00;
  } else {
   return ((Double) price).doubleValue();
  }
 }

 private HashMap getStockTable() {
  HashMap map = new HashMap();
  map.put("ibm", new Double(50.00));
  map.put("wso2", new Double(75.00));
  map.put("msft", new Double(35.00));
  return map;
 }
}

The services.xml service descriptor for the service can be written as follows.  

<service name="QpidStockQuoteService" scope="application" targetNamespace="http://axis2qpid.wso2.org">
 <description>
 Stock Quote Service Over AMQP
 </description>
 <messageReceivers>
  <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-only"
   class="org.apache.axis2.rpc.receivers.RPCInOnlyMessageReceiver"/>
  <messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-out"
   class="org.apache.axis2.rpc.receivers.RPCMessageReceiver"/>
 </messageReceivers>
 <parameter name="ServiceClass">org.wso2.example.axis2qpid.service.QpidStockQuoteService</parameter>
 <parameter name="transport.jms.Destination">directQueue</parameter>
</service>

 The last parameter "transport.jms.Destination" specifies that requests to this service should be sent to the destination designated in the JNDI properties file by the JNDI name "directQueue". Note that if you do not specfiy an explicit destination, the JMS transport will create a Queue by the service name and listen to it.

Pack up the service class and the service descriptor and copy the resultant .aar file to AXI2_HOME/repository/services folder. Now, restart axis2server and see if the service had been deployed successfully by pointing your browser at http://localhost:8080/

5. Create and run client


What remains is to whip up a simple JMS client and send a request to the service you deployed above. The code for that is as follows.

package org.wso2.example.axis2qpid;

import java.io.FileInputStream;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class QpidSenderReceiver {

 public static void main(String[] args) {
  try {
   Properties properties = new Properties();
   properties.load(new FileInputStream("direct.properties"));

   // Create the initial context
   Context ctx = new InitialContext(properties);

   // look up destination and connection factory
   ConnectionFactory conFac = (ConnectionFactory) ctx.lookup("qpidConnectionfactory");   

   // get the JMS destination queue
   Destination destination = (Destination) ctx.lookup("directQueue");   

   // get the JMS reply queue
   Destination replyDest = (Destination) ctx.lookup("replyQueue");
   Connection connection = conFac.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer messageProducer = session.createProducer(destination);   

   // create Text message body. 
   String body = "<getSharePrice mlns='http://service.axis2qpid.example.wso2.org'><symbol>wso2</symbol></getSharePrice>";
   TextMessage message = session.createTextMessage();
   message.setJMSReplyTo(replyDest);
   System.out.println("Request:" + message.getText());
   messageProducer.send(message, Message.DEFAULT_DELIVERY_MODE,
     Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
   MessageConsumer consumer = session.createConsumer(replyDest);
   connection.start();
   Message reply = consumer.receive();
   System.out.println("Response:" + ((TextMessage) reply).getText());
   connection.close();
   ctx.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

From the client's code you see that the body of the request TextMessage is an XML element named "getSharePrice", which is the name of the service method to be invoked. It is namespace qualified with "http://service.axis2qpid.example.wso2.org". This, you may note, comes from the package name of the service implementation class.

The client gets the configuration information necessary to establish a session with the Qpid broker using JNDI from a properties file name client.properties, which is as follows.

#initial connection factory

java.naming.factory.initial =org.apache.qpid.jndi.PropertiesFileInitialContextFactory

# register some connection factories

# connectionfactory.[jndiname] = [ConnectionURL]

connectionfactory.qpidConnectionfactory =amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'

# Register an AMQP destination in JNDI

# destination.[jniName] = [BindingURL]

destination.directQueue =direct://amq.direct//QpidStockQuoteService

destination.replyQueue =direct://amq.direct//queue You will need to add the Qpid libraries to the classpath for this client to compile and execute.

 

6. Summary 

In this tutorial you learned how to invoke a Web service over AMQP, by configuring Axis2/Java to talk with a Qpid broker using Axis2's JMS transport. Granted, the scenario described above is extremely simple, but it should provide you with a base for doing really fancy things with AMQP and Axis2. For any clarifications or details on how to go further with Axis2, Synapse or Qpid, all you have to do is drop emails to the mailing lists of the respective projects. 

References

1. Apache Axis2/Java JMS Transport

2. Apache Qpid - How to use JNDI

3. Qpid AMQP Connection URL format

4. Qpid AMQP Binding URL format

5. Blog entry on Qpid with Axis2 by Rajith Attapattu

Author

Suran Jayathilaka, Software Engineer at WSO2. suran at wso2 dot com.

AttachmentSize
axis2-qpid.zip4.89 KB
WSO2Con 2014