Implementing Store and Forward Messaging Patterns with WSO2 ESB Part 2
- By Charith Wickramarachchi
- |
- 4 Jan, 2012
This content last curated on 28/03/2012
Applies to
WSO2 ESB | 4.0.3 |
WSO2 MB | 1.0.2 |
Introduction
WSO2 Enterprise Service Bus is a lightweight and high performing Enterprise Service Bus which is recognised for its performance, user friendliness and first class support for enterprise application integration patterns. With the recent WSO2 ESB 4.0.0 release, it introduced the capability of implementing store and forward messaging patterns within the mediation flow. This is a continuation of the article “ Implementing Store and Forward Messaging patterns with WSO2ESB part 1”
In this article, it first describes Message Forwarding processor with its different operating modes. In the latter part of the article it explains how to use the ESB message stores integrated with Message Brokers to achieve more reliability and final part of this article contain some sample configurations that demonstrate some use cases.
Scheduled Message Forwarding Processor
Scheduled Message Forwarding Processor is another Message Processor that is shipped with ESB. It can be used to reliably deliver messages between an ESB endpoint and a backend service.
Like any other message processors it also keep the messages to be processed in a Message Store. In this case it act as a message queue to the processor. Following are the main steps that a Message forwarding processor takes when it is processing a message.
- Peek (Take a copy of the first message) a Message from Message Queue (Message store). If Queue is empty exit the job so that it can re run in the next schedule.
- If it is a out-only invocation do a robust send to the back end service and go to step 4 if its an in-out invocation go to step 3
- Do an in-out invocation to the back end service and go to step 5.
- If an Error comes exit the job without removing the message from the queue so that it can re do the job again in next schedule. If the invocation is successful go to step 6.
- If an error pops, send the property “blocking.sender.error=true” in the message context and invoke the sequence “message.processor.fault.sequence” configured in the message processor with the original message and exit the job so that it can re-run in the next schedule. If the invocation is successful invoke the sequence “message.processor.reply.sequence” configured in message processor with the response message and go to step 6
- Remove the message from queue and go to step 1
Following diagrams shows the behavior in in only invocation scenarios and a in out invocation scenarios.
In only invocation scenario
In out invocation scenario
Note that in an 'in out' scenario you can use the response sequence to send the response coming back to another service in an asynchronous manner. So that application can correlate them at the application level. Idea of response sequence is to handle the response of the invocation asynchronously. Response sequence can be used to do any processing of that response (eg : Writing response data to a database, transforming the message and invoke another service etc... )
Message Forwarding processor can be in two states, active and inactive. When message forwarding processor in active state it will keep on processing messages according to the process mentioned above. It will try to deliver messages to its intended endpoints. Optionally, users can configure maximum number of redeliver attempts for a message processor so that it will get automatically deactivated after a configured number of failed redeliver attempts.
When a Message Forwarding processor is in inactive state. Users can delete, resend ,view messages in the message queue using the ESB management console or using JMX Mbeans registered. Some of these operations will not be available when using JMSMessage store as the underlying message store. Where in that case deleting/resending of random messages are not allowed.
There are few Message Context properties and configuration parameters that are useful when using the Message forwarding processor[1]
Message Forwarding Processor Parameters
Parameter Name |
Value |
Required |
interval |
Message retry interval in milliseconds |
No (The default is value is 1000ms) |
max.deliver.attempts |
Maximum failed redelivery attempts before deactivating the processor |
NO |
axis2.repo |
Axis2 Client repository to be use to send messages |
NO |
axis2.config |
Axis2 Configuration to be use to send messages |
NO |
message.processor.reply.sequence |
Name of the Sequence that reply of the message should be sent to |
NO |
message.processor.fault.sequence |
Name of the sequence that fault message should be sent to in case of a SOAP fault |
NO |
quartz.conf |
Quartz configuration file path |
NO |
cronExpression |
Cron Expression to be used to configure the re try pattern |
NO |
Message Context Properties associated with Message Forwarding Processor
Property Name |
Value |
Required |
target.endpoint |
Name of the Address Endpoint which message should be delivered to |
YES |
OUT_ONLY |
true if this is a out only message |
This need to be set in messages that are intended to do out only invocations. |
JMS Message Store
JMS message store is a Message Store implementation that is shipped with WSO2 ESB which uses a JMS Queue as its underlying storage. This can be used as a persistence store for messages to achieve more reliability in scenarios where messages are stored and accumulated for a long time.
JMSMessage Store can be configured by using the implementation class org.wso2.carbon.message.store.persistence.jms. JMSMessageStore. In the message store configuration. And Additional configuration details needed to connect to the JMS Queue are provided via parameters in the Message Store configuration[2]
Parameter Name |
Value |
Required |
java.naming.factory.initial |
Initial Context Factory to use to connect to the JMS broker |
YES |
java.naming.provider.url |
URL of the naming provider to be used by the context factory |
YES |
store.jms.destination |
JNDI Name of the Queue Name that message store is connecting |
NO but for some JMS clients this will be needed |
store.jms.connection.factory |
JNDI name of the Connection factory which is used to create jms connections |
NO but for some JMS clients this will be needed |
store.jms.username |
User Name that is used to create the connection with the broker |
NO |
store.jms.password |
Password that is used to create the connection with the broker |
NO |
store.jms.JMSSpecVersion |
1.1 or 1.0 JMS API specification to be used (Default 1.1) |
NO |
store.jms.cache.connection |
true/false Enable Connection Caching |
NO |
Configuring JMS Message Store with WSO2 Message Broker
In the coming section i’ll describe how we can configure MessageStore to work with a JMS Queue in WSO2 Message Broker. (Note : Here I'm using ESB_HOME as WSO2 ESB installation directory path and MB_HOME as WSO2 MB installation directory path.)
- Open carbon.xml in MB_HOME/repository/conf directroy and set the offset value to 1 (eg :
1 ) - Open qpid-virtualhosts.xml in MB_HOME/repository/conf/advanced directory set the default virtual host as carbon (eg:
carbon ) - Start WSO2 Message Broker.
- Open jndi.properties file in ESB_HOME/repository/conf directory and change broker port to 5673 (eg : connectionfactory.QueueConnectionFactory = amqp://admin:[email protected]/carbon?brokerlist='tcp://localhost:5673')
- Add the entry queue.JMSMS=JMSMS to the jndi.properties file in ESB_HOME/repository/conf
- Start WSO2 ESB
Now you can configure the JMSMessage store with following configuration
In the same way you can configure WSO2ESB JMSMessage Store to work with any other JMS Message Broker. In that case the configuration parameter values must be changed with appropriate values and JMS client libs must be installed to WSO2 ESB
Sample Scenarios with Message Forwarding Processor and JMS Message Store
'In Only' service invocation with Message Forwarding Processor.
In this example i’ll give a sample where how we can do an 'in only' service invocation via the Message Forwarding processor. In this sample I’ll use the SimpleStockQuoteService shipping with the WSO2 ESB distribution as the back end service.
In this scenario we will be invoking the placeOrder operation of the SimpleStockQuoteService which is a in only web service operation.
First we need to define an Endpoint in WSO2 ESB to point to that service. Message Forwarding processor can only be used with an AddressEndpoints. I’ll use the following endpoint configuration.
For this sample I’ll use the JMS Message Store that we defined above. So after creating the Message Store next thing we need to do is create a Message Forwarding processor. The message forwarding processor configuration for this scenario looks like this.
Only remaining part is to create an entry point to messages and implement the flow. For that I’ll be using a proxy service with following configuration.
<proxy xmlns="http://ws.apache.org/ns/synapse" name="InOnlyProxy" transports="https,http" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" /> <property name="OUT_ONLY" value="true" /> <property name="target.endpoint" value="SimpleStockQuoteService" /> <log level="full" /> <store messageStore="JMSMS" /> </inSequence> </target> </proxy>
Here we are setting the property FORCE_SC_ACCEPTED in the message flow to send a Http 202 status to the client after ESB accepts a message.
Since this is a out only invocation from ESB we are setting the property OUT_ONLY in the message flow. And the target endpoint for this message flow is defined in message context property target.endpoint
To Try this scenario, shutdown the backend service and send a placeOrder request message to the ProxyService EPR.
1 1 IBM
After 4 attempts message processor will get deactivated. Where then you can start the backend service and send the message from the Management console.
In Out service Invocations with Message Forwarding Processor
In this example I’ll give a sample where how we can do an in out service invocation via the Message Forwarding processor. I'll be using the same back end service which was used in the previous one.
In this scenario we will be using the getSimpleQuote webservice operation which is an 'in out' operation. And we will reuse the endpoint used in previous scenario since we are invoking the same webservice.
For this we will need to create another Message Store. I will use following configuration to create a message store. (note that you will need to define the jms queue name used here in the jndi.properties file too )
To implement this 'in out' scenario I’ll create a another message forwarding processor which now has an additional parameter message.processor.reply.sequence to point to a sequence to handle the response message.
Sequence “replySequence” is a top level sequence where its configuration will look like
To accept the message and store them, following proxy configuration can be used. (Note that in this scenario OUT_ONLY property is not there.)
<proxy xmlns="http://ws.apache.org/ns/synapse" name="InOutProxy" transports="https,http" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" /> <property name="target.endpoint" value="SimpleStockQuoteService" /> <log level="full" /> <store messageStore="JMSMS1" /> </inSequence> </target> </proxy>
And when you invoke the proxy service with getSimpleQuote request. You will see the service get invoked and then the responce will be logged in the replySequnece
INFO - LogMediator To: /services/InOutProxy, WSAction: urn:getSimpleQuote, SOAPAction: urn:getSimpleQuote, MessageID: urn:uuid:dec12d9c-5289-476c-9d9a-b7bb7ebc7be4, Direction: request, REPLY = MESSAGE, Envelope:-2.3141856129298564 12.877155014054368 172.73334579339183 165.31090559096748 Thu Dec 29 07:48:42 IST 2011 -164.80767926468306 9451314.231029626 IBM Company -161.41234152690964 25.74977555860659 -1.2214036358135663 189.46935681818218 IBM 8611
Conclusion
In this article we discussed about the Message Forwarding Processor and it's use cases. Also we looked in to JMSMessageStore and how we can integrate it with WSO2 Message Broker.
References
[1]
https://docs.wso2.org/display/ESB/Message+Forwarding+Processor
Author
Charith Wickramarachchi, Software Engineer, WSO2 Inc.