[Article] WSO2 Enterprise Integrator - Mediation, Service and Data Integration Under the Same Roof

  • By Hasitha Abeykoon
  • 12 Jul, 2017

WSO2 Enterprise Integrator consists of a number of components that provide the features required for a complete integration use case. By having a single product with combined functionality, developers can learn and start using the product faster and cross features will be seamlessly integrated out-of-the-box. Message transformation, mediation, data services, and message brokering are some of the features that will be highlighted in this article.

Scenario

Let’s consider an organization registering its customers through a web portal. Registration might take some time to process because it needs to persist information into the corporate database for later use. Thus, the party doing the registration doesn’t need to wait until registration completes, but the system needs to make sure all the registration data is saved in the order that it came in. Also, incoming messages need to be processed before being persisted to capture additional information such as the time of the request.

Below are the two type of request messages made by the web portal:

  1. Messages to add a customer
  2. Messages to add a company

Those messages will come into WSO2 Enterprise Integrator via the HTTP protocol. The caller of the service should not be blocked until the information is processed and saved to the system. Rather, the service should “accept” it for processing. The critical needs mentioned below need to be addressed for this message processing:

  • The accepted message MUST be processed reliably.
  • Messages should be processed in the order they were accepted by the service. Customer requests and company requests should be treated individually.
  • Even if the backend systems are down, no message loss should happen. Eventually the accepted messages should be persisted in databases.

In order to address the above requirements while building the solution we need the following building blocks provided by product WSO2 Enterprise Integrator out-of-the-box:

  • Message transformation: Some fields are not provided by the user in the incoming message but are generated by the system. The system time is considered as such a parameter. Hence, some fields need to be added to the incoming request during mediation.
  • Message brokering: This is needed to accept and store messages until they are reliably persisted. Necessary acknowledgment patterns should be used to ensure guaranteed delivery.
  • Message filtering: A customer and a company are considered as two different types of data and stored separately. Only customer requests should have the system time. There should be a way to filter incoming messages to identify these two types of messages and apply the transformation as required.
  • Data services: Information is eventually stored in a database. The message should be picked up from the broker and stored by calling a service. WSO2 has a flexible way of handling this requirement using data services. A data service is used to expose operations on a database table as a SOAP or REST service. WSO2 Enterprise Integrator mediation flow can invoke the service and get a response to see whether the operation has been executed successfully or not using the data services.

The following diagram illustrates the message flow of the scenario. All the components represented are within WSO2 Enterprise Integrator.

Figure 1

Creating necessary schema and table

MySQL is used as the database to store information.

  • Create a DB schema called “corporate_db”
    CREATE DATABASE corporate_db;
    
  • Create a table called “customer”
    CREATE TABLE `corporate_db`.`customer` (
      `id` INT NOT NULL AUTO_INCREMENT,
      `name` VARCHAR(45) NOT NULL,
      `request_time` VARCHAR(45) NOT NULL,
      `tp_number` VARCHAR(45) NULL,
      `address` VARCHAR(45) NULL,
      PRIMARY KEY (`id`));
    

    Figure 2

  • Create a table called “company”
      CREATE TABLE `corporate_db`.`company` (
      `id` INT NOT NULL AUTO_INCREMENT,
      `name` VARCHAR(45) NOT NULL,
      `register_country` VARCHAR(45) NULL,
      `employee_count` VARCHAR(45) NULL,
      `address` VARCHAR(45) NULL,
      PRIMARY KEY (`id`));
    

    Figure 3

Preparing the data service

Now to expose the above tables via a SOAP service, you need to create a data service using WSO2 Enterprise Integrator. You can download the product here and extract it. Lets’ consider it as [EI_HOME] in this article. Please refer to Appendix 1 to get the generated XML file representing the whole data service.

Configurations

  1. Copy the MySQL database driver to the [EI_HOME]/lib folder.
  2. Navigate to [EI_HOME]/bin folder and start the server using the ./integrator.sh script. Log into the management console using the credentials admin/admin.

Adding data source

  1. Navigate to Configure > Datasource and add a data source configuration pointing to the database schema created (use the necessary database user to connect).

    Figure 4

Writing queries

  1. Navigate to Main > Data Service > Create and follow the wizard to create the data service.
  2. Add the data source we just created as shown in the below image.

    Figure 5

  3. Add a query to insert the information as shown in the below image.

    Figure 6

  4. Add input mappings specifying the elements needed by the query.

    Figure 7

Writing operations

  1. Add an operation to represent the service that needs to be invoked in order to add customers and companies. Tick the Return Request Status checkbox as the reply when the service is invoked.

    Figure 8

  2. When saved, the data service will be deployed. You can view it by navigating to the services view.

    Figure 9

Testing the service

  1. Try the service by clicking on Try this service. WSO2 Enterprise Integrator can auto-generate the needed request by looking at the WSDL of the generated service.

    Figure 10

Now we have the data service ready. The next section of the article describes how to design the mediation flow to match the message flow of the scenario depicted above.

Setting up the message broker

WSO2 Enterprise Integrator has a message broker profile. Once started, it can behave as a scaling, clusterable broker to address asynchronous message delivery and guaranteed delivery. WSO2 Message Broker supports the AMQP-091 protocol and JMS 1.1 API specification. Mediation components of WSO2 Enterprise Integrator are written according to the JMS specification so that it can call the broker in a standard manner.

  1. Make a copy of WSO2 Enterprise Integrator product and navigate to the bin folder.
  2. Start the product as a message broker by running ./broker.sh.
  3. All broker-related configurations can be found at [EI_HOME]/wso2/broker/conf/broker.xml

The broker is accessible via AMQP on port 5675 by default. WSO2 Enterprise Integrator’s broker profile will persist messages into a file-based H2 database out-of-the-box. But this can be easily modified to point to mySQL, Oracle or any other database. You can log into the management console of the broker at https://127.0.0.1:9446/carbon/.

Configuring WSO2 Enterprise Integrator message store and message processor

“Message store and forward” is a common integration pattern used to achieve asynchronous message delivery in enterprise integration scenarios. Message store and message processor are the relevant implementations in WSO2 Enterprise Integrator for the store and forward pattern. Let’s see how to configure them for guaranteed delivery.

Configuring the server

As this feature needs to communicate with the broker, we need to copy the required client libraries into the [EI_HOME]/lib folder. Shutdown the WSO2 Enterprise Integrator server and copy the following two libraries to that location:

  • [EI_HOME]/wso2/broker/client-lib/andes-client-3.2.13.jar
  • [EI_HOME]/wso2/broker/client-lib/geronimo-jms_1.1_spec-1.1.0.wso2v1.jar

The pattern places messages on a JMS queue. In this scenario we will use two different queues for customer and company requests as we need to keep their orders separately (the auto-generated ID is considered as the request key). To find queues and broker to connect the message store and message processor components, refer to the [EI_HOME]/conf/jndi.properties file where we need to specify following:

  • Connection to the broker
  • Queues and their Java Naming and Directory Interface (JNDI) names

The edited file should look like below:

#
# Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
#queue.MyQueue = example.MyQueue
queue.customers = customers
queue.companies = companies

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic

Note: For the new configuration changes to come into effect, start the server again. Log into the management console as described earlier.

Configuring the message store

Add two JMS message stores for customers and companies as shown below. See how the jndi.properties file is pointed to in configurations.

Figure 11

Let us consider them as “customerRequestStore” and “companyRequestStore” respectively. See Appendix 2 for XML-based configurations.

Configuring the data service endpoint

Navigate to Main > Service Bus > Endpoints and add an address endpoint for the deployed data service. You can find the URL from the service description page. In the mediation flow, whenever we need to send a message to a data service this endpoint can be used. Note that timeout values, etc. can be configured for this endpoint. See Appendix 3 for endpoint XML configurations.

Figure 12

We can also use local transport here to stop the network call as the message flow happens inside the same JVM.

local:///services/request_record_service/

Configuring the message processor

The message processor polls messages from the configured message store and delivers to the endpoint and invokes the given message sequence with the response. For example, customerRequestForwarder will poll messages from customerRequestStore and invoke the data service endpoint we defined earlier. To achieve this behavior it works on two protocols. It receives messages using JMS and invokes back the end data service with HTTP.

All guaranteed delivery semantics can be accessed out-of-the-box with forwarding message processors. It acknowledges the messages only if the back-end service was invoked successfully. There is a bunch of interesting behaviors you can configure here:

  • If an issue happened how many times should the processor try to deliver the message.
  • If and issue happened what time interval should the next retry happen.
  • What are the service level HTTP status codes that the message processor should consider as a service failure (i.e. if service returned with 500, 401, etc.).
  • Should the processor deactivate itself after retrying a specific number of times or continue forever (you cannot deliver the next message as it breaks the message order).
  • Should the message be dropped after trying a specific number of times.

Figure 13: SOAP UI input and output for addCustomer scenario

Please refer to Appendix 4 to get the XML configurations for message processors. Once the configurations are saved, it communicates with the broker. You should not see any issues on server logs when this happens.

If the management console of the broker is viewed now, two queues is created for two stores.

Figure 14: mySQLWorkbench view of the table with inserted customer record

Configuring a sequence to drop a message

The data service responds with a message that we do not need to send to the caller. If it contains a “SUCCESS” message we can assume that the information given is stored to the database without an issue. If it does not, the message processor needs to retry. In this example we will log the message and drop (neglect and end message flow) the response from data service.

<sequence name="DB_call_status_seq" xmlns="http://ws.apache.org/ns/synapse">
    <log level="full">
        <property name="DB_STATUS_SEQ" value="REPLY_FROM_DB"/>
    </log>
    <drop/>
</sequence>

We need to specify this sequence at both message processors’ “Reply sequence”.

Configuring message transformation and mediation

Now as the required building blocks for the message flow are ready, it is a matter of wiring them up to build the message flow according to the requirement. There should be a proxy service to accept the message into the mediation.

  • The external party will invoke this proxy service with a customer or company request message. They should not wait for any response. Thus correct implementation is to send “202” accept message back to the caller. To do this we need to specify the property below in the proxy service.
     <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
    
  • After the message is received into the proxy service, we need to identify whether it’s a customer or company message in order to transform it into the format that the data service needs. The easiest way to do that is to check the elements in the message. A filter mediator can be used to identify that using an XPATH and the necessary branching can be done in the mediation logic.
     <filter xpath="boolean(//addCustomer)">
        <then>
           //mediation logic for customer messages 
        </then>
        <else>
           //mediation logic for company messages
        </else>
     </filter>
    
  • To transform the message the payload factory mediator can be used. By using it you can extract the necessary values from the incoming payload and construct the required message (the required format can be extracted by the payload generated by “tryit” feature discussed in above sections).
    	    <payloadFactory media-type="xml">
                      <format>
                         <p:addCustomer xmlns:p="http://ws.wso2.org/dataservice">
                            <xs:name xmlns:xs="http://ws.wso2.org/dataservice">$1</xs:name>
                            <xs:request_time xmlns:xs="http://ws.wso2.org/dataservice">$2</xs:request_time>
                            <xs:tp_number xmlns:xs="http://ws.wso2.org/dataservice">$3</xs:tp_number>
                            <xs:address xmlns:xs="http://ws.wso2.org/dataservice">$4</xs:address>
                         </p:addCustomer>
                      </format>
                      <args>
                         <arg evaluator="xml" expression="//name"/>
                         <arg evaluator="xml" expression="get-property('SYSTEM_DATE', 'yyyy.MM.dd')"/>
                         <arg evaluator="xml" expression="//tpNumber"/>
                         <arg evaluator="xml" expression="//address"/>
                      </args>
                   </payloadFactory>
    

    Above, note how request_time is constructed by WSO2 Enterprise Integrator using a system-defined property. In the same way, construct the payload required for the addCompany operation as well. See Appendix 5 for a complete proxy service configuration.

  • Note that we are using the same endpoint to send both customer and company messages. The differentiation is the “Action” transport header that comes with the payload. Axis2 dispatches the message to the necessary operation by looking at the action header. Hence, you need to set the “Action” header to either addCustomer or addCompany as required.
    <header name="Action" scope="transport" value="addCustomer"/>
    
  • At the end we need to place the message in the relevant message store. Internally, when storing the message a JMS message producer sends the message across an AMQP connection to the pointed queue.
    <store messageStore="customerRequestStore"/>
    

See Appendix 5 for complete proxy service configuration. Now the whole message flow is wired according to the solution above. It is at the testing stage.

Testing

We can test the message flow when all components are communicating with each other without issues. Create a SOAP UI project with the WSDL of the proxy service (specifying URL).

Successful scenario

  1. Add a customer

    When invoked with the SOAP message below, we will see that a record is added into the “customer” table. Server logs will indicate how the message flow has happened.

    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
       <soapenv:Header/>
       <soapenv:Body>
            <addCustomer>
                <name>Smith</name>
                <tpNumber>0834558649</tpNumber>
                <address>No. 456, Gregory Road, Los Angeles</address>
            </addCustomer>
       </soapenv:Body>
    </soapenv:Envelope>  
    

    Figure 15

  2. Add a company

    When invoked with the SOAP request below a record will be added to the “company” table.

    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
     <soapenv:Header/>
        <soapenv:Body>
            <addCompany>  
                <name>WSO2</name>
                <registerCountry>Sri Lanka</registerCountry>
                <employeeCount>500</employeeCount>
                <address>No. 20, Palm Grove, Colombo</address>
            </addCompany>
        </soapenv:Body>
     </soapenv:Envelope>
    

Failure scenario

To mimic a failure scenario we can break the connection between the database and the WSO2 Enterprise Integrator server. Let us shutdown the MySQL server and test the same. The expected behavior is described below.

  • The message processor will pick up the message from the store and try to invoke the data service. The data service will return an error as it cannot communicate with the database server.
  • This will happen four times as it is the default retry number of times for the message processor.
  • Then the message processor will deactivate itself.
  • If the message processor is activated again using the management console, it will try another four times and deactivate again.
  • When the database server is started again and the message processor is activated, the record will be added to the database.

Figure 16

This failure scenario demonstrates guaranteed delivery capability and ordered message processing capability of the solution.



Extending the solution

The above solution is for a single WSO2 Enterprise Integrator server node. Nodes can crash for different reasons, thus as a solution, high availability is important. Following is a few points for a high availability solution that delivers a more robust delivery guarantee.

  • WSO2 Enterprise Integrator can be fronted by a load balancer so that the RequestAcceptProxy has high availability.
  • If the message broker node is down, messages cannot be accepted. As a solution, a few WSO2 Enterprise Integrator nodes can be started as a cluster. When configuring the jndi.properties file, broker nodes can be specified as failover nodes. Then if the main broker node is down JMS traffic is automatically routed to the next node. But to guarantee ordered delivery the broker design should support that.
  • In a clustered environment, message processor should only work on a particular node. If that node is crashed, another node should take over the message processing work.
  • The data service can be fronted by a load balancer and high availability is achieved on that layer.
  • Database information can be replicated.

Conclusion

It is vital for an enterprise messaging system to tolerate failures. Using WSO2 Enterprise Integrator such robust systems can be designed. WSO2 Enterprise Integrator is designed to provide complete capabilities that is needed to build a complete and comprehensive integrating solution. Instead of learning several products that deliver different capabilities, the complete solution can be built using one product. Every aspect of the product is designed on top of the award-winning WSO2 Carbon platform proven to work and perform. Thus it is worthwhile to try out all these 100% open source features under the same roof.

Appendix

Appendix 1 (Data service)

<data name="request_record_service" transports="http https local">
   <description>Service for recording customer or company requests.                                 &#xd;                                </description>
   <config enableOData="false" id="coorporate_db">
      <property name="carbon_datasource_name">coorporate_db</property>
   </config>
   <query id="addCustomer" useConfig="coorporate_db">
      <sql>INSERT INTO customer (name, request_time, tp_number, address) VALUES (?,?,?,?)</sql>
      <param name="name" sqlType="STRING"/>
      <param name="request_time" sqlType="STRING"/>
      <param name="tp_number" sqlType="STRING"/>
      <param name="address" sqlType="STRING"/>
   </query>
   <query id="addCompany" useConfig="coorporate_db">
      <sql>INSERT INTO company (name, register_country, employee_count, address) VALUES (?,?,?,?)</sql>
      <param name="name" sqlType="STRING"/>
      <param name="registerCountry" sqlType="STRING"/>
      <param name="employeeCount" sqlType="INTEGER"/>
      <param name="address" sqlType="STRING"/>
   </query>
   <operation name="addCustomer" returnRequestStatus="true">
      <description>                                    record customer                                     &#xd;                                    </description>
      <call-query href="addCustomer">
         <with-param name="name" query-param="name"/>
         <with-param name="request_time" query-param="request_time"/>
         <with-param name="tp_number" query-param="tp_number"/>
         <with-param name="address" query-param="address"/>
      </call-query>
   </operation>
   <operation name="addCompany" returnRequestStatus="true">
      <description>                                    record company&#xd;                                    </description>
      <call-query href="addCompany">
         <with-param name="name" query-param="name"/>
         <with-param name="registerCountry" query-param="registerCountry"/>
         <with-param name="employeeCount" query-param="employeeCount"/>
         <with-param name="address" query-param="address"/>
      </call-query>
   </operation>
</data>

Appendix 2 (Message stores)

customerRequestStore

<messageStore name="customerRequestStore" class="org.apache.synapse.message.store.impl.jms.JmsStore" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
   <parameter name="java.naming.provider.url">conf/jndi.properties</parameter>
   <parameter name="store.jms.destination">customers</parameter>
   <parameter name="store.jms.connection.factory">QueueConnectionFactory</parameter>
   <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
   <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
   <parameter name="store.failover.message.store.name">customerRequestStore</parameter>
</messageStore>

companyRequestStore

<messageStore name="companyRequestStore" class="org.apache.synapse.message.store.impl.jms.JmsStore" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
   <parameter name="java.naming.provider.url">conf/jndi.properties</parameter>
   <parameter name="store.jms.destination">companies</parameter>
   <parameter name="store.jms.connection.factory">QueueConnectionFactory</parameter>
   <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
   <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
   <parameter name="store.failover.message.store.name">customerRequestStore</parameter>
</messageStore>

Appendix 3 (Data service endpoint)

<endpoint xmlns="http://ws.apache.org/ns/synapse" name="request_record_service_ep">
   <address uri="http://localhost:8280/services/request_record_service">
      <suspendOnFailure>
         <progressionFactor>1.0</progressionFactor>
      </suspendOnFailure>
      <markForSuspension>
         <retriesBeforeSuspension>0</retriesBeforeSuspension>
         <retryDelay>0</retryDelay>
      </markForSuspension>
   </address>
</endpoint>

Appendix 4 (Message processors)

customerRequestForwarder

<messageProcessor name="customerRequestForwarder" class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" targetEndpoint="request_record_service_ep" messageStore="customerRequestStore" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="interval">1000</parameter>
   <parameter name="client.retry.interval">1000</parameter>
   <parameter name="max.delivery.attempts">4</parameter>
   <parameter name="message.processor.reply.sequence">DB_call_status_seq</parameter>
   <parameter name="is.active">true</parameter>
   <parameter name="max.delivery.drop">Disabled</parameter>
   <parameter name="member.count">1</parameter>
</messageProcessor>

companyRequestForwarder

<messageProcessor name="companyRequestForwarder" class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" targetEndpoint="request_record_service_ep" messageStore="companyRequestStore" xmlns="http://ws.apache.org/ns/synapse">
   <parameter name="interval">1000</parameter>
   <parameter name="client.retry.interval">1000</parameter>
   <parameter name="max.delivery.attempts">4</parameter>
   <parameter name="message.processor.reply.sequence">DB_call_status_seq</parameter>
   <parameter name="is.active">true</parameter>
   <parameter name="max.delivery.drop">Disabled</parameter>
   <parameter name="member.count">1</parameter>
</messageProcessor>

Appendix 5 (Proxy service configuration)

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="RequestAcceptProxy"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target>
      <inSequence>
         <log level="full">
            <property name="ACCEPT_PROXY" value="INCOMING_MESSAGE"/>
         </log>
         <filter xpath="boolean(//addCustomer)">
            <then>
               <payloadFactory media-type="xml">
                  <format>
                     <p:addCustomer xmlns:p="http://ws.wso2.org/dataservice">
                        <xs:name xmlns:xs="http://ws.wso2.org/dataservice">$1</xs:name>
                        <xs:request_time xmlns:xs="http://ws.wso2.org/dataservice">$2</xs:request_time>
                        <xs:tp_number xmlns:xs="http://ws.wso2.org/dataservice">$3</xs:tp_number>
                        <xs:address xmlns:xs="http://ws.wso2.org/dataservice">$4</xs:address>
                     </p:addCustomer>
                  </format>
                  <args>
                     <arg evaluator="xml" expression="//name"/>
                     <arg evaluator="xml" expression="get-property('SYSTEM_DATE', 'yyyy.MM.dd')"/>
                     <arg evaluator="xml" expression="//tpNumber"/>
                     <arg evaluator="xml" expression="//address"/>
                  </args>
               </payloadFactory>
               <header name="Action" scope="transport" value="addCustomer"/>
               <log level="full">
                  <property name="ACCEPT_PROXY" value="CUSTOMER_PAYLOAD_AFTER_TANSFORM"/>
               </log>
               <store messageStore="customerRequestStore"/>
            </then>
            <else>
               <payloadFactory media-type="xml">
                  <format>
                     <p:addCompany xmlns:p="http://ws.wso2.org/dataservice">
                        <xs:name xmlns:xs="http://ws.wso2.org/dataservice">$1</xs:name>
                        <xs:registerCountry xmlns:xs="http://ws.wso2.org/dataservice">$2</xs:registerCountry>
                        <xs:employeeCount xmlns:xs="http://ws.wso2.org/dataservice">$3</xs:employeeCount>
                        <xs:address xmlns:xs="http://ws.wso2.org/dataservice">$4</xs:address>
                     </p:addCompany>
                  </format>
                  <args>
                     <arg evaluator="xml" expression="//name"/>
                     <arg evaluator="xml" expression="//registerCountry"/>
                     <arg evaluator="xml" expression="//employeeCount"/>
                     <arg evaluator="xml" expression="//address"/>
                  </args>
               </payloadFactory>
               <header name="Action" scope="transport" value="addCompany"/>
               <log level="full">
                  <property name="ACCEPT_PROXY" value="COMPANY_PAYLOAD_AFTER_TRANSFORM"/>
               </log>
               <store messageStore="companyRequestStore"/>
            </else>
         </filter>
         <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/>
      </inSequence>
   </target>
   <description/>
</proxy>