2010/12/15
15 Dec, 2010

How to Subscribe to an AMQP Queue using WS Eventing

  • Lahiru Gunathilake
  • Senior Software Engineer - WSO2

Contents

Introduction

 
This explains how to successfully do a subscription for an AMQP Queue. The configuration will be different based on the implementation you are using for AMQP and WS-Eventing. This tutorial is targeted for the users of WSO2 ESB to show the interoperability of AMQP with WSO2 ESB (including WS-standards). Our setup will be a very basic one which contains 5 components.
 
  • AMQP client - initiate the message flow by sending a sample message to Apache Qpid Java broker.
  • Apache Qpid Java broker - AMQP implementation, which act as the message broker.
  • WSO2 ESB - listening to a defined queue in Apache Qpid and pick the messages from Qpid when a new message arrived to the Queue.
  • WSO2 Event Source - Event Source, to handle subscriptions and notifications.
  • WSO2 Event Listener - Web Services hosted in simple Axis server which is listening to the Queue in Apache Qpid.
  To setup this scenario, we are using WSO2 ESB as the WS-Eventing implementation and WSO2 ESB is going to act as the middleware for this setup. Please refer to the setup guide for Apache Qpid Java broker with WSO2 ESB [1] and configure WSO2 ESB to listen to Apache Qpid Java broker. With this setup, if we create a proxy service with the name “QpidEventingService” and update the Service Bus Configuration Queue will be created in Apache Qpid with the proxy service name.If a message arrive to that Queue appropriate proxy service’s in sequence will be executed. Incoming message will be the message came to the Queue. Based on the message content you need to change the content type of the proxy service. In the below example we assume messages coming to Qpid will be plain/text messages. Incoming message will be a JMS message appropriate to the AMQP message arrived at Qpid Queue (Qpid Client that we use at ESB end, converts native AMQP messages to a JMS and hands over to ESB). So, as an initial step you can create a sample proxy service in ESB.
 

Applies To

Product Version
WSO2 ESB 3.0.0/3.1.0

 

 

 

 

Configure WSO2 ESB

 
Create an EventSource with default configurations, so the Synapse configuration for the Event Source will be as shown below. 
 
                          <eventSource name="SampleEventSource">
        <subscriptionManager class="org.apache.synapse.eventing.managers.DefaultInMemorySubscriptionManager">
            <property name="topicHeaderNS" value="https://apache.org/aip"/>
            <property name="topicHeaderName" value="Topic"/>
        </subscriptionManager>
 
Now, we need to configure our proxy service to filter the subscription based on the queue name message came from. As a first step let's create a sequence (PublicEventSource) which will be the in sequence of the proxy service created above.                                                                                                                                                                                                          
 Note: Before proceeding, it's good to understand how WSO2 WS-Eventing implementation works in this scenario. When we subscribe to a particular Queue, we first need to send a subscription message to the Event Source (This is again a service hosted in WSO2 that specifically handles Event subscriptions and notifications) created in the step above. There, we need to mention what queue we want to subscribe to and where Event Source should send the notifications to. In our Eventing implementation, internal Event dispatching for subscribers happens by filtering a message Header with the name "Topic".    

Ex: If a message hits in “PublicEventSource” sequence with Header Topic="QpidEventService" and if there is a subscription for this Topic, we will notify the subscribers Endpoint by sending the current message to the endpoint (If there are mediators, the message will go through them and the result will be sent to the notification endpoint), provided in subscription process. All the incoming messages from Apache Qpid are having their Queue name in a transport level property called  JMS_DESTINATION and we need to pick that property and set it as the Header property with name as Topic. After successfully setting Header Topic, Event Source will filter the event based on the subscriptions and send the notifications.

To get a better understanding about WSO2 Eventing implementation, please read this article[2].
As I have explained in the step above, we need to pick a property from the incoming JMS message and set the Queue Name to Header property Topic, so that the “PublicEventSource” configuration will look like this.  

   <sequence name="PublicEventSource">
          <property xmlns:ns2="https://org.apache.synapse/xsd" xmlns:ns="https://org.apache.synapse/xsd" name="Topic" expression="get-property('transport','JMS_DESTINATION')"/>
  <eventPublisher eventSourceName="SampleEventSource"/> </sequence> 
 
Now you can see that PublicEventSource is binded to the EventSource created initially and we are setting the Topic header from the incoming JMS message, if the message come from different Queue other than subscribed Queue it will not be send to anyone. To demonstrate the subscriber will request to send the notification to SimpleStockQuoteService so in this sequence I will be changing the current message to a suitable message matched to SimpleStockQuoteService. So final configuration for PublicEventSource will looks like below.
 
<sequence name="PublicEventSource">
        <property xmlns:ns2="https://org.apache.synapse/xsd" xmlns:ns="https://org.apache.synapse/xsd" name="Topic" expression="get-property('transport','JMS_DESTINATION')" scope="transport"/>
        <header xmlns:aip="https://apache.org/aip" xmlns:ns2="https://org.apache.synapse/xsd" xmlns:ns="https://org.apache.synapse/xsd" name="aip:Topic" expression="get-property('transport','JMS_DESTINATION')"/>
        <log level="full"/>
        <enrich>
            <source type="inline">
                <m:placeOrder xmlns:m="https://services.samples">
                    <m:order>
                        <m:price>10.10</m:price>
                        <m:quantity>1000</m:quantity>
                        <m:symbol>Message 1</m:symbol>
                    </m:order>
                </m:placeOrder>
            </source>
            <target type="body"/>
        </enrich>
        <eventPublisher eventSourceName="SampleEventSource"/>
 
Next step is to bind the sequence with the proxy service created above. In this configuration I assume the messages we pick from Apache Qpid are text/plain. So the proxy service configuration will look like this.

 <proxy name="QpidEventingService" transports="jms http" startOnLoad="true">
        <target inSequence="PublicEventSource"/>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>contentType</jmsProperty>
                <default>text/plain</default>
            </rules>
 </parameter>
 
 

Initiating Notification Endpoint

 
To start “SimpleStockQuoteService”, run ant command in “ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService” and run axis2Server.sh or axis2Server.bat script at “ESB_HOME/samples/axis2Server”. We have an endpoint to send the notification when a message comes to a defined AMQP Queue.
 
 

Creating a client to send a sample message to Qpid

Next step is to find how to send a sample message to an Apache Qpid Queue, for this checkout the source of Apache Qpid Client from the tag [3] and point you classpath to QPID_HOME/lib. Apply the patch[4] and compile Client.java . Or if you are familiar with Apache Qpid, you can use clients written in Java, .NET, Python and C++ to send a sample message to a defined queue.
 
 

Send the subscription Message

To subscribe you need to send a valid subscription SOAP message to EventSource. Open “ESB_HOME/samples/axis2Client/build.xml” and change the following properties,
<property name="address" value="https://localhost:9000/services/SimpleStockQuoteService"/>   Notification URL

<property name="addurl" value="https://localhost:8280/services/SampleEventSource"/>  EventSource URL

<property name="topic" value="QpidEventingService"/>   Topic definition for this subscription
 
Change directory to “ESB_HOME/samples/axis2Client” and run "ant eventsubscriber" which will subscribe to the Queue QpidEventingService. Or else, use static subscriptions in Service Bus configuration and you can skip the above step.
 

Conclusion

To test the above setup, send a message to QpidEventingService queue by running previously compiled sample code, which will send a message to queue QpidEventingService. You should now see a message that has come to your SimpleStockQuoteService hosted in Axis2 Server. 

Author: Lahiru Gunathilaka, Senior Software Engineer, WSO2 Inc

 

About Author

  • Lahiru Gunathilake
  • Senior Software Engineer
  • WSO2 Inc.