2011/11/03
3 Nov, 2011

WSO2 ESB with Oracle™AQ as Messaging Media

  • Vijayaratha Vijayasingam
  • - WSO2

In this tutorial, Oracle™AQ is referred as OracleAQ and WSO2 ESB is referred as ESB.

Contents :

Applies To :

WSO2 ESB 4.0.0
Oracle Database 11g(11.2.0)
ApacheDS 1.5.3

If you have configured OrcaleAQ and LDAP server successfully, skip these steps and follow from ESB configurations. For the completeness of the tutorial, following steps have been added.

1. Oracle database installation and configurations

As a first step you have to download and install Oracle database server. You can download it here. In this setup Oracle11g( v11.2.0 ) database is used. Oracle11g comes with the required libraries to talk to AQ via OracleJMS.

Steps :

Oracle defines privileges in several levels. Lets create a user, who needs to have authorization to create Queues and Queuetables. I assume that you can use Sqlplus. (It is an Oracle command-line utility program to run sql and pl/sql commands)

  1. Go to command prompt and execute following commands. Here, minimum set of authorizations for a particular user, is listed out. You may need to grant more privileges for the user.
  2. Sqlplus (Create a user='jmsuser' password='jmsuser')
    • #sqlplus sys/admin@orcl as sysdba;
    • #create user jmsuser identified by jmsuser account unlock;
    • #grant create session,dba to jmsuser;
    • #grant connect, resource to jmsuser;
    • #grant aq_administrator_role to jmsuser  identified by  jmsuser;
    • #grant execute on dbms_aq to  jmsuser;
    • #grant execute on dbms_aqadm to jmsuser;
    • #exec dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','jmsuser');
    • #exec dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','jmsuser');
    • #grant execute on sys.aq$_jms_text_message to jmsuser;
    • #exit
  3. Now try to connect as the newly created user (ie: jmsuser)

1.1 Java client to create a queue

Simple java client is used to create a queue. (you can use pl/sql script also to create queue)

1.1.1 Create a connection to the database server

Use the newly created user and password.

QueueConnection getConnection() {
    String hostname = "localhost";
    String oracle_sid = "orcl";
    int portno = 1521;
    String userName = "jmsuser";
    String password = "jmsuser";
    String driver = "thin";
    QueueConnectionFactory QFac = null;
    QueueConnection QCon = null;
    try {
      // get connection factory 
      QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
      // create connection
      QCon = QFac.createQueueConnection(userName, password);
     } catch (Exception e) {
	e.printStackTrace();
     }
	return QCon;
}

1.1.2 Create a queue

Before creating a queue, you need to create a 'queuetable'. Queuetable contains reference to the queue and message type. Lets try to create a queue which supports JMS plain text messages.
If you like to support other types of JMS messages, the particular user must be granted for the privileges.(we already granted privilege for the user to use handle JMS text messages)

void createQueue() {
    try {
      AQQueueTableProperty qt_prop;
      AQQueueTable q_table = null;
      AQjmsDestinationProperty dest_prop;
      Queue queue = null;
      Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
      qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");
      q_table = ((AQjmsSession) session).createQueueTable(user, qTableName, qt_prop);
      System.out.println("Qtable created");
      dest_prop = new AQjmsDestinationProperty();
      /* create a queue */
      queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
      System.out.println("Queue created");
     /* start the queue */
      ((AQjmsDestination) queue).start(session, true, true);
      } catch (Exception e) {
	 e.printStackTrace();
 	 return;
     }
}

1.1.3 Send text messages to queue

After the successful creation of the queue, you are able to send messages to that particular queue.

void sendMessage(String user, String queueName) {
   try {
	QueueConnection QCon = getConnection();		
	Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
	QCon.start();
	Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
	MessageProducer producer = session.createProducer(queue);
	TextMessage tMsg = session.createTextMessage("test");
	producer.send(tMsg);
	System.out.println("Sent message = " + tMsg.getText());
	session.close();
	producer.close();
	QCon.close();
   } catch (JMSException e) {
 	e.printStackTrace();
	return;
  }
}

1.1.4 Consume messages

Try to browse/consume messages from the newly created queue.

void consumeMessage(String user, String queueName) {
  Queue queue;
   try {
	QueueConnection QCon = getConnection();		
	Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
	QCon.start();
	queue = ((AQjmsSession) session).getQueue(user, queueName);
	MessageConsumer consumer = session.createConsumer(queue);
	TextMessage msg = (TextMessage) consumer.receive();
	System.out.println("Msg Received " + msg.getText());
	consumer.close();
	session.close();
	QCon.close();
	} catch (JMSException e) {		
          e.printStackTrace();
	}
}

Note: You might need following jars at your class path to run above client. These jars come with the Oracle database distribution.

  • ojdbc6.jar
  • jta.jar
  • jmscommon.jar
  • aqapi.jar

(Sample AQ Java client is available as attachment.)

Now, it is sure that messages can be sent and received without any issues to the OracleAQ.

2. LDAP server installation and configuration(ApacheDS)

To enable Java Naming and Directory Interface (JNDI) lookup in OJMS, you need to register the database with LDAP server. JMS administrator can register ConnectionFactory objects in a LDAP server.

Here, ApacheDS is used as the sample LDAP server. (you can use whatever LDAP server you like). You must have following structure of Oracle Streams AQ entries in the LDAP server .

As in OracleAQ guide, LDAP directory structure would be;
"A structure with four levels. The top level, labeled cn=acme, cn=com, is the administrative context. The second level, labeled cn=OracleContext, is the root of the Oracle RDBMS schema. The third level, labeled cn=db1, is the database. The fourth level splits into three nodes. They are Connection Factories, labeled cn=OracleDBConnections, Queues/Topics, labeled cn=OracleDBQueue, and Other DB Objects, labeled cn=... "

2.1 ApacheDS installation and configuration

Download ApcheDS here , which comes with eclipse based LDAP browser and LDAP server.

  • Download , install and start the Apache Directory Studio™ (v1.5.3)
  • Go to 'File' menu and click 'New'. Select 'ApcheDS' server. Go to 'next' page.
  • Provide a unique name to identify your LDAP server instance.(eg: apacheds) Click 'Finish'. You will see your newly created server at servers panel.
  • Double click on server instance (ie:apacheds's) link.You will see a window which lists all options to configure the server.
  • At the 'General' tab, you can provide the port numbers for different protocols. Here leave the default port numbers as it is.
    • LDAP =10389
    • LDAPS=10636

  • Go to 'servers' panel and click the 'start' button.
  • Now, server is started. Need to create a connection for that server.
  • Go to 'LDAP' menu and select 'New Connection' . Provide;
    • Connection name - oracleAQ
    • Hostname - localhost
    • Port no -10389 (This is the port number 'apacheds' server instance running)
  • Click 'Next'.
    • Authentication Method - "Simple Authentication"
    • Bind DN - "ou=system,uid=admin" (this is the one by default available partition and the user)
    • Bind password - 'secret' (default  passoword)

  • If you provide all the above parameters correctly , your connection will start successfully. You can see the available partitions at LDAP browser.

That is it for ApacheDS installation and configuartion.

2.2 Partition for OracleAQ entries directory structure

To create new partition, which is going to be used to keep Oracle Streams AQ entries in the LDAP server, I have used an oracle.schema(available as an attachment) file, which contains all ObjectClasses definitions needed by AQ.

  • Import that to ApacheDS and create a new schema project. Then you can export that as *.ldif file. And again import the *.ldif file to your LDAP server. If this step is successfully done,“oracle specific object classes” can be used.

Let us create a new partition to keep OracleAQ entries.

Steps :

  • Double click on server link. You will see the server.xml's graphical view. From that select 'Partitions' tab.
  • Under partitions section you will see default two partitions are listed out.
    • System
    • example
  • Click on the 'Add' button. You will see a new partition will be created. Provide ID and suffix. Click on 'Save' button @ toolbar.
    • ID='Services'
    • suffix='ou=Services, o=sgi,c=us'

You can do this at server.xml by adding new <jdbmPartition>. Go to the ApcheDS installation directory find the server.xml and copy one of the existing "example" partition.

Now you have created a new partition called "Services". But you can not view that at LDAP browser. To view at browser, need to create a new 'Context entry'.

  • Go to LDAP browser,'Root DSE '-->Right click->'New Context entry' . Select 'Create entry from scratch' option.
  • Select 'ObjectClass' as 'Organizationalunit' click  'Add'. So you will see added object classes at your right pane.
  • Now enter the DN as you provided when you create the partition.
    • Eg: ou=Services, o=sgi,c=us
  • Click 'next' and Finish. You will see the newly created entry at LDAP browser .
  • To connect to this newly created partition , you have to create a uid. Rightclick-->new entry-->select ObjectClass as 'inetoperson'.
    • Select base DN as dn='ou=Services, o=sgi,c=us'
    • uid='ratha'
    • Provide password for the uid ='secret' (just provide the default password)
  • Now user is created. Let us try to create entries for AQ. According to OracleAQ entries structure, the second level would be,cn=OracleContext, which is the root of the Oracle RDBMS schema.
    • Rightclick->New entry-->Select 'use existing entry as template'
    • Provide 'ou=Services, o=sgi,c=us'.
    • Click 'Next' and select following 'javacontainer' as ObjectClass. Provide 'cn=oraclecontext'.
  • Likewise create other entries also. You can check the attachment "oracle.ldif", to find the attributes and ObjectClasses belongs to various levels.
  • Finally, your LDAP browser will look like this;

AQ entries in LDAP server is successfully created now. The above mentioned settings are totally related to OracleAQ and the JNDI lookup for OracleJMS. Now we will look at the ESB side configurations for OracleAQ.

3. ESB transport configurations

To support JMS tarnsport we have to enable 'jms' transport at axis2 configuration. Find the axis2.xml and edit as following;
(you must be carefull when you provide the base DN and connection parameters)

Note: ESB, by default comes with LDAP user store. You might need to change your configured ldap server port number(just change at server.xml)

3.1 Axis2 transport level configuration

To enable JMS transport, you need to un comment the 'JMS' transport sender and receiver in order to send and receive messages to/from OracleAQ.
Here LDAP server is used as the data source, which can be accessible via JNDI API.

Edit axis2 configuration as follows;

<!-- ================================================= -->
<!--             Transport Ins (Listeners)             -->
<!-- ================================================= -->
 
<!--ORACLEAQ-->

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
  <parameter name="myQueueConnectionFactory" locked="false">
    <parameter name="java.naming.factory.initial" locked="false">com.sun.jndi.ldap.LdapCtxFactory</parameter>
    <parameter name="java.naming.provider.url" locked="false">ldap://localhost:10388/</parameter>
    <parameter name="server_dn" locked="false">cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us</parameter>
    <parameter name="java.naming.security.principal" locked="false">uid=ratha,ou=Services, o=sgi,c=us</parameter>
    <parameter name="java.naming.security.credentials" locked="false">secret</parameter>
    <parameter name="java.naming.security.authentication" locked="false">simple</parameter>
    <parameter name="transport.jms.UserName" locked="false">jmsuser</parameter>
    <parameter name="transport.jms.Password" locked="false">jmsuser</parameter>			
    <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">cn=ratha,cn=oracledbconnections,cn=ORCL,
cn=OracleContext,ou=Services, o=sgi,c=us</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> <parameter name="default" locked="false"> <parameter name="java.naming.factory.initial" locked="false">com.sun.jndi.ldap.LdapCtxFactory</parameter> <parameter name="java.naming.provider.url" locked="false">ldap://localhost:10388/</parameter> <parameter name="server_dn" locked="false">cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us</parameter> <parameter name="java.naming.security.principal" locked="false">uid=ratha,ou=Services, o=sgi,c=us</parameter> <parameter name="java.naming.security.credentials" locked="false">secret</parameter> <parameter name="java.naming.security.authentication" locked="false">simple</parameter> <parameter name="transport.jms.UserName" locked="false">jmsuser</parameter> <parameter name="transport.jms.Password" locked="false">jmsuser</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">cn=ratha,cn=oracledbconnections,cn=ORCL,
cn=OracleContext,ou=Services, o=sgi,c=us</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> </transportReceiver>
<!-- ================================================= -->
<!--             Transport Outs (Senders)              -->
<!-- ================================================= -->

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

Note:

1. Above configurations are based on the ApcheDS configuration we made above. You might need to change parameter values according to your LDAP server configurations.

2. You should keep following jars in your "ESB_HOME\repository\components\lib" folder.

  • aqapi.jar
  • jmscommon.jar
  • jta.jar
  • ojdbc6.jar

If you successfully configure above step, server will start to listen to the queue, which we have created already.

4. Switching transport - JMS to HTTP (Sample 250)

With the above configurations, let us try to execute the standard ESB sample 250 with OracleAQ as a messaging media. In this sample, ESB listens to the JMS queue(OracleAQ) for the messages and whenever messages arrive to the queue, ESB picks them and send back to the back end service via HTTP.

Steps :

  • Deploy the sample stock quote service and start the axis2esrver.
    • Run "ant" at 'ESB_HOME\samples\axis2Server\src\SimpleStockQuoteService' directory level. This will build the SimpleStockQuote service and deploy at the axis2 server.
    • Run the axis2server.(#axis2server.bat/sh . Script is at 'ESB_HOME\samples\axis2Server')
  • Edit the JMS transport listener and sender to point OracleAQ as mentioned above. (ESB_HOME\repository\conf\axis2.xml) Check the attachment.
  • Start the ESB server.
  • Create 'StockQuoteProxy' as described below.
 <proxy name="StockQuoteProxy" transports="https http jms" startOnLoad="true" trace="disable">
        <target>
            <endpoint>
                <address uri="https://localhost:9000/services/SimpleStockQuoteService"/>
            </endpoint>
            <inSequence>
                <log level="full"/>
                <enrich>
                    <source type="body" clone="true"/>
                    <target type="property" property="jms_body_text"/>
                </enrich>
                <property name="jms_body_text" expression="get-property('jms_body_text')" scope="default"/>
                <xslt key="jmsMsgToSoapMsg_xslt">
                    <property name="jms_text" expression="get-property('jms_body_text')"/>
                </xslt>
                <log level="full">
                    <property name="After transformation" value="************"/>
                </log>
            </inSequence>
            <outSequence>
                <log level="full">
                    <property name="OUT SEQUENCE" value="************"/>
                </log>
            </outSequence>
        </target>
        <publishWSDL uri="file:repository/samples/resources/proxy/sample_proxy_1.wsdl"/>
        <parameter name="transport.jms.ContentType">
            <rules>
                <jmsProperty>ContentType</jmsProperty>
                <default>text/plain</default>
            </rules>
        </parameter>
        <parameter name="transport.jms.Destination">cn=ratha.test,cn=OracleDBQueues,cn=ORCL,cn=OracleContext,
ou=Services, o=sgi,c=us</parameter> </proxy> <localEntry key="jmsMsgToSoapMsg_xslt" src="file:jmsMsgToSoapMsg.xslt"/>

Here we try to send text messages to the queue. To process them, ESB constructs a default SOAP message out of it and send back to the service. A 'XSLT' script is used to transform the default SOAP message to the actual request, which is needed in order to execute the back end service (ie: StockQutoe service).

  • Run the Javaclient and send a text message to queue (eg: 'test')
  • Set 'SOAPAction' property when you send a text message to execute a particular operation.
textMessage.setStringProperty("SOAPAction","getQuote" ); 

Successful message invocation will print following log at your axis2 server console;

   samples.services.SimpleStockQuoteService :: Generating quote for : test

You can see the following log at your ESB console, where it is started to listen to the JMS destination.

    [2011-10-09 22:53:21,288]  INFO - JMSListener JMS listener started
    [2011-10-09 22:53:21,299]  INFO - ServiceTaskManager Task manager for service : StockQuoteProxy [re-]initialized
    [2011-10-09 22:53:23,301]  INFO - JMSListener Started to listen on destination : cn=ratha.test,cn=Or
    acleDBQueues,cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us of type queue for service StockQuotePr
    oxy

5. Switching transport - HTTP to JMS (Sample 251)

In this sample, when you send 'http' request to proxy, ESB sends the request message to the 'queue' and 'JMS' enabled back-end service listens to that queue and picks up messages whenever queue receives and executes it.

Steps:

  • For this, sample axis2server (which comes with ESB distribution) is configured to listen to the Queue. Edit sample axis2 server's transport configuration as mentioned above. (ie: @ axis2server's axis2.xml)
  • Add the additional following parameter to 'service.xml' file of 'SimpleStockQuoteService'; (This is the 'DN' value for the created queue)
<parameter locked="false" name="transport.jms.Destination">cn=ratha.test,cn=OracleDBQueues,cn=ORCL,cn=OracleContext,   
ou=Services, o=sgi,c=us</parameter>
  • Rebuild the service and deploy it to the server.
    • Go to "ESB_HOME\samples\axis2Server\src\SimpleStockQuoteService", run #ant
    • Run the axis2server.bat/sh
  • Execute the ant client;
    • Go to "ESB_HOME\samples\axis2Client", run as;
     ant stockquote -Daddurl=https://localhost:8280/services/StockQuoteProxy -Dmode=placeorder -Dsymbol=MSFT
  • Start ESB server.
  • Create 'StockQuoteProxy' proxy as follows; you need to provide all required 'JMS' parameters in order to make a successful connection to the LDAP server.
  • <proxy name="StockQuoteProxy" transports="http" startOnLoad="true">
      <target>
        <endpoint>
          <address	uri="jms:/cn=ratha.test,cn=OracleDBQueues,cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us?
    transport.jms.ConnectionFactoryJNDIName=cn=ratha,cn=oracledbconnections,cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us&
    java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory&
    java.naming.provider.url=ldap://localhost:10388/&
    transport.jms.DestinationType=queue&
    server_dn=cn=ORCL,cn=OracleContext,ou=Services, o=sgi,c=us&
    java.naming.security.principal=uid=ratha,ou=Services, o=sgi,c=us&
    java.naming.security.credentials=secret&
    java.naming.security.authentication=simple&
    transport.jms.UserName=jmsuser&
    transport.jms.Password=jmsuser" /> </endpoint> <inSequence> <log level="full"> <property name="INSEQ" value="*****" /> </log> </inSequence> </target> </proxy>

    You will see the generated stock quote info at axis2server console.

    Note: Other carbon servers also can be configured like this in order to work with OracleAQ.

    References :

    Author:

    Vijayaratha Vijayasingam, Software Engineer, WSO2 Inc

     

    About Author

    • Vijayaratha Vijayasingam
    • wso2