2012/01/18
18 Jan, 2012

Integrating Different Systems with WSO2 ESB

  • Amila Suriarachchi
  • Architect, Member, Management Committee - Data Technologies - WSO2

Introduction

WSO2 Enterprise service bus (WSO2 ESB) is a leading open source ESB based on Apache Synapse. Apache synapse is based on Apache Axis2 which provides a pluggable transport, message format and message building support. The canonical form used by Apache synapse is XML and hence message builders are used to create the XML representation from the original message while message formatters are used to convert the XML format into any custom message format. Synapse mediation engine can be used to process the messages at the ESB level. There can be four types of end points from which an ESB can receive and send messages.

  • Use a standard transport with XML message.
  • Use a standard transport with non standard text message format.
  • Use a non standard transport with XML message.
  • Use a non standard transport and a non standard binary message format.

The first type of end points uses standard transports such as HTTP, SMTP, POP3, IMAP, JMS and FTP to send messages and message formats can be SOAP or POX. Most of theses transports and message formats are supported out of the box.

For the second type of end points standard transports can be used with custom message formatters and builders.

Third type can be used with existing message formatters and builders while plugging custom transports. Here it is worth to note that in Axis2 message formats and the wire level transport it use to send the message is not related. For an instance it can send SOAP or POX messages using the same message builder and formatter with different transports such as HTTP and SMTP. Message formatters and builders use input and output streams to build and serialize the messages.

Fourth type of end points such as RMI, CORBA, JDBC and BAPI receive input parameters and provide out put parameters in binary formats. Compared to Second and third types, message formats at the wire level and transport is coupled and use specific formats.

These end points can be published as proper web services using a technique called custom deployers[1] within carbon platform. Then ESB can talk to published web services using HTTP and SOAP message formats. WSO2 Data Services Server (WSO2 DSS) is an example of using a custom deployer[1] to publish JDBC end points as a web service.

Rest of this article provides a sample scenario which describes how to read a file using FTP transport, convert to XML format using a builder and process the message using WSO2 ESB.

Applies To

WSO2 ESB 4.0.0
WSO2 DSS 2.6.2

Contents

Sample scenario

Senario Overview

Most of the current systems which involves batch processing use text files as means of communicating with different systems. Therefore this sample scenario starts with processing such a text file in CSV format. WSO2 ESB provides a transport called FTP to read files from the file system. Since this file is in CSV format, CSVFileBuilder is used to convert the file content to XML format. After that the message is received at the OrderProcessor Proxy service where each and every record is processed one by one. For each record customer name and email address is retrieved from the data base using customer ID and send an email to the customer email address. When sending the mail message again it formats the message using EmailMessageFormatter. Finally all the records saved to the data base as a batch using a data service deployed in WSO2 DSS. Following sections describe each and every component in detail. Sample code can be found here.

VFS transport

WSO2 ESB provides different transports to communicate with different systems. VFS transport[2] is the means of reading and writing files to different file systems. WSO2 ESB inherits transports from Apache Axis2. Hence in order to use a transport its' receivers and senders have to configure in the axis2.xml. Then service specific parameters can be specified at the service level.

Message Builder

Message Builders are the way of creating XML format from the incoming message format. In Axis2 it is assumed that different message formats associated with different content types and hence message receivers are registered with a content type. For VFS users can define a content type using the transport.vfs.ContentType parameter and register the builder for that. ProcessDocument method of CSVFileBuilder looks like this. Input Stream is the content stream of the message received and this method should return the XML element which is equivalent to received input stream.


    public OMElement processDocument(InputStream inputStream,
                                     String s,
                                     MessageContext messageContext) throws AxisFault {

        SOAPFactory soapFactory = OMAbstractFactory.getSOAP11Factory();
        SOAPEnvelope soapEnvelope = soapFactory.getDefaultEnvelope();

        OMNamespace omNamespace = soapFactory.createOMNamespace("https://wso2.org/sample/shop/order", "ns1");

        OMElement batchRequestElement =
                soapFactory.createOMElement("AddOrder_batch_req", omNamespace);

        OMElement addOrderElement = null;

        OMElement orderIDElement = null;
        OMElement customerIDElement = null;
        OMElement dateElement = null;
        OMElement priceElement = null;

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        try {
            String readLine = bufferedReader.readLine();
            String[] values = null;
            while (readLine != null) {
                addOrderElement = soapFactory.createOMElement("AddOrder", omNamespace);
                values = readLine.split(",");

                //adding child elements.
                orderIDElement = soapFactory.createOMElement("orderID", omNamespace);
                orderIDElement.setText(values[0]);
                addOrderElement.addChild(orderIDElement);

                customerIDElement = soapFactory.createOMElement("customerID", omNamespace);
                customerIDElement.setText(values[1]);
                addOrderElement.addChild(customerIDElement);

                dateElement = soapFactory.createOMElement("date", omNamespace);
                dateElement.setText(values[2]);
                addOrderElement.addChild(dateElement);

                priceElement = soapFactory.createOMElement("price", omNamespace);
                priceElement.setText(values[3]);
                addOrderElement.addChild(priceElement);

                batchRequestElement.addChild(addOrderElement);
                readLine = bufferedReader.readLine();
            }

            soapEnvelope.getBody().addChild(batchRequestElement);
            return soapEnvelope;

        } catch (IOException e) {
            throw new AxisFault("Can not read the input stream", e);
        } finally {
            try {
                bufferedReader.close();
            } catch (IOException e) {
                System.out.println("Error in closing the reader");
            }
        }

    }

Proxy service

Proxy service is the means of adding message processing in WSO2 ESB. It mainly consists of two sequences called in and out. Each sequence have a set of mediators which does the various processing of the message. First it clones the incoming message. This is required since after the iteration it sends the whole message to data service end point. By default synapse does not cache or build the message when sending. Iterator is used to iterates through each record. First it fetch the other details of the customer using DBLookup mediator. Then using XSLT mediator it transforms the message and send it to the mail address of the customer. Finally save the whole message using a data service.

<proxy xmlns="http://ws.apache.org/ns/synapse" name="OrderProcessor" transports="https http vfs" startOnLoad="true" trace="disable">
    <target>
        <inSequence>
            <log level="full"/>
            <property name="OUT_ONLY" value="true"/>
            <clone continueParent="true" sequential="true">
                <target>
                    <sequence>
                        <iterate xmlns:sn="https://wso2.org/sample/shop/order" id="orderIterator" expression="//sn:AddOrder_batch_req/sn:AddOrder" sequential="true">
                            <target>
                                <sequence>
                                    <log level="full"/>
                                    <dblookup>
                                        <connection>
                                            <pool>
                                                <password>root</password>
                                                <user>root</user>
                                                <url>jdbc:mysql://localhost:3306/SHOP_DB</url>
                                                <driver>com.mysql.jdbc.Driver</driver>
                                            </pool>
                                        </connection>
                                        <statement>
                                            <sql>select EMAIL_C,NAME_C from CUSTOMER_T where CUSTOMER_ID_C = ?</sql>
                                            <parameter expression="//sn:AddOrder/sn:customerID" type="VARCHAR"/>
                                            <result name="email" column="EMAIL_C"/>
                                            <result name="name" column="NAME_C"/>
                                        </statement>
                                    </dblookup>
                                    <log level="custom">
                                        <property name="email" expression="get-property('email')"/>
                                        <property name="name" expression="get-property('name')"/>
                                    </log>
                                    <xslt key="orderTransformer">
                                        <property name="email" expression="get-property('email')"/>
                                        <property name="name" expression="get-property('name')"/>
                                    </xslt>
                                    <log level="full"/>
                                    <property name="messageType" value="text/csv" scope="axis2"/>
                                    <header name="To" expression="fn:concat('mailto:', get-property('email'))"/>
                                    <send/>
                                </sequence>
                            </target>
                        </iterate>
                    </sequence>
                </target>
            </clone>
            <log level="full"/>
            <property name="messageType" value="application/soap+xml" scope="axis2"/>
            <send>
                <endpoint>
                    <address uri="https://localhost:9764/services/OrderService"/>
                </endpoint>
            </send>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        </inSequence>
    </target>
    <parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
    <parameter name="transport.PollInterval">15</parameter>
    <parameter name="transport.vfs.MoveAfterProcess">file:///sample/data/processed</parameter>
    <parameter name="transport.vfs.FileURI">file:///sample/data</parameter>
    <parameter name="transport.vfs.MoveAfterFailure">file:///sample/data/failure</parameter>
    <parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
    <parameter name="transport.vfs.ContentType">text/csv</parameter>
    <parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>

Message Formatter

Message formatters are used to create the specific message formats with XML model. Message formatters are also based on the content type. Axis2 picks the message formatter to be used with a message using content type. By default synapse uses the received message content type for out going message. This can be changed using messageType property. Axis2 pases the out put stream of the transport sender to message formatter so that users can write to that directly. WriteTo or getBytes methods get invoked depending on the transport.

 public byte[] getBytes(MessageContext messageContext,
                           OMOutputFormat omOutputFormat) throws AxisFault {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        writeTo(messageContext, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    public void writeTo(MessageContext messageContext,
                        OMOutputFormat omOutputFormat,
                        OutputStream outputStream, boolean b) throws AxisFault {
        writeTo(messageContext, outputStream);

    }

    private void writeTo(MessageContext messageContext, OutputStream outputStream) throws AxisFault {
        SOAPEnvelope soapEnvelope = messageContext.getEnvelope();
        OMElement addOrderElement = soapEnvelope.getBody().getFirstElement();

        String orderID = addOrderElement.getFirstChildWithName(
                new QName("https://wso2.org/sample/shop/order", "orderID")).getText();
        String email = addOrderElement.getFirstChildWithName(
                new QName("https://wso2.org/sample/shop/order", "email")).getText();
        String name = addOrderElement.getFirstChildWithName(
                new QName("https://wso2.org/sample/shop/order", "name")).getText();
        String date = addOrderElement.getFirstChildWithName(
                new QName("https://wso2.org/sample/shop/order", "date")).getText();
        String price = addOrderElement.getFirstChildWithName(
                new QName("https://wso2.org/sample/shop/order", "price")).getText();

        try {
            outputStream.write("Order ID : ".getBytes());
            outputStream.write(orderID.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Email : ".getBytes());
            outputStream.write(email.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Name : ".getBytes());
            outputStream.write(name.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Date : ".getBytes());
            outputStream.write(date.getBytes());
            outputStream.write("\n".getBytes());

            outputStream.write("Price : ".getBytes());
            outputStream.write(price.getBytes());
            outputStream.write("\n".getBytes());


        } catch (IOException e) {
            throw new AxisFault("Can not write to the output stream");
        }

    }

Mail transport

Mail transport is also another transport which is available with WSO2 ESB. Mail transport[3] uses SMTP protocol to send message and support both POP3 and IMAP protocols to receive messages. In order to send a message using mail transport, sender must be configured at the axis2.xml. Then at the proxy service level mail address can be given as an end point.

Data Service

WSO2 DSS let users to access data base data using XML messages. It provides this feature by using a custom deployer to create Axis2 services and using JDBC to communicate with the data base. WSO2 DSS provides a feature called batch processing in order to update data base using a set of sql statements. Therefore this sample uses that to upload all the data in the file to data base with one message.

<data name="OrderService" enableBatchRequests="true" serviceNamespace="https://wso2.org/sample/shop/order">
   <config id="shopdatasource">
      <property name="org.wso2.ws.dataservice.driver">com.mysql.jdbc.Driver</property>
      <property name="org.wso2.ws.dataservice.protocol">jdbc:mysql://localhost:3306/SHOP_DB</property>
      <property name="org.wso2.ws.dataservice.user">root</property>
      <property name="org.wso2.ws.dataservice.password">root</property>
   </config>
   <query id="addOrderQuery" useConfig="shopdatasource">
      <sql>insert into ORDER_T values (:orderID, :customerID, :date,:price);</sql>
      <param name="orderID" sqlType="STRING" />
      <param name="customerID" sqlType="STRING" />
      <param name="date" sqlType="TIMESTAMP" />
      <param name="price" sqlType="DOUBLE" />
   </query>
   <operation name="AddOrder">
      <call-query href="addOrderQuery">
         <with-param name="orderID" query-param="orderID" />
         <with-param name="customerID" query-param="customerID" />
         <with-param name="date" query-param="date" />
         <with-param name="price" query-param="price" />
      </call-query>
   </operation>
</data>

Conclusion

Enterprise software systems use different technologies and different protocol and message formats to communicate with each other. ESB provides a solution for this problem by providing message mediation, transformation techniques and message processing features. WSO2 ESB supports pluggable transports in order to communicate with different systems while providing support for almost all the standard transports. Message formatters and builders can be used to convert from and to different message formats to XML model which is used in the ESB. If the end point access is available as an API or nature of invocation is similar to invoke a method then those end points can be exposed a web services using a custom deployer. This techniques allows other integration layer technologies such as BPS to directly communicate to those end points as well.

References

[1]https://wso2.org/library/articles/extending-axis2

[2]https://wso2.org/project/esb/java/4.0.0/docs/samples/transport_samples.html#Sample254

[3]https://wso2.org/project/esb/java/4.0.0/docs/samples/transport_samples.html#Sample255

Author

Amila Suriarachchi, Software Architect, WSO2 Inc.

 

About Author

  • Amila Suriarachchi
  • Architect, Member, Management Committee - Data Technologies
  • WSO2 Inc.