Creating Custom Data Publishers to BAM/CEP

  • By Sriskandarajah Suhothayan
  • 24 Jul, 2012

Applies To

WSO2 BAM 2.0.0 and above
WSO2 CEP 2.0.0 and above


Data is a critical part of SOA environment, aggregating data for data analysis, and streaming data for event processing are two most important areas of data processing. In WSO2 platform Business Activity Monitoring (BAM) is used as the Data aggregation point which aggregates data, analyze them and presents them in a user friendly manner, and at the same time Complex Event Processing (CEP) is used for event processing that can be used to identify and notify occurrences of meaningful event patterns in real time. For both scenarios, in most cases, we collect huge amount of data from various data collection points such as ESB, Application servers, and Custom Data Publishers, etc and pump them to servers such as CEP and BAM. Here we use Data Bridge to facilitate this data transfer in a effective and a efficient manner.

In this article I’ll explain the Data Bridge Thrift Agent component architecture, how it behaves and how we can create an Custom Data Publisher and publish data to the back end Data Receiver of the Data Bridge which will indeed dispatch data to its subscribers such as CEP and BAM


Component Architecture

Agnet Client Component Architecture

Here the clients send events via the Data publishers and all those Data publishers will use the same Agent which will indeed govern data publishing.

Agent - Only one Agent should be used per JVM and it will be shared among all the Data publishers. When tuning for performance we need to alter the Agent configuration to optimize memory usage and throughput.

DataPublisher - There will be one data publisher per receiver configuration which will be unique for the data receiver endpoint and the username that is used to publish data.

EventPublisher - The Event publisher will convert the events to thrift or other appropriate data format, aggregate the converted events and send events to the Data Receiver of the Data Bridge as Event bundles in an asynchronous manner.

EventQueue - Event Queue is used as a buffer to aggregate events that need to be sent to an endpoint.

Threadpool - The thread pool of the Agent will be used by all the data publishers to send events to the data receivers.

Connecting To Backend

Thrift Data publisher (Client) will be sending events to the Data Bridge via thrift using the tcp transport. When starting the server you may notice that the Data Receiver of the Data Bridge is exposing two thrift ports.

The appropriate server logs are given here;

    [2012-06-12 22:47:22,916]  INFO {org.wso2.carbon.databridge.receiver.thrift.internal.ThriftDataReceiver} -  Thrift SSL port : 7711
    [2012-06-12 22:47:22,932]  INFO {org.wso2.carbon.databridge.receiver.thrift.internal.ThriftDataReceiver} -  Thrift port : 7611

Here the “Thrift SSL port” should be used for secure data transmission and the “Thrift port” for non secure data transfer. To access the secure port and send data you have to connect to their server with the url format

    ssl://<Ip address for the server>:<Thrift SSL post>

E.g. ssl://

Similarly for non secure data transfer we need to use the following url format

    tcp://<IP address of the server>:<Thrift port>

E.g tcp://

Further by default the server starts with the thrift port 7611 and the thrift SSL port 7711 and by default the Thrift SSL port will be equal to Thrift port + 100 unless otherwise server configuration is changed.

Initializing DataPublisher

For the simple case where single Data Publisher will be there per JVM.

1. Normal data transfer

Here we can initialize the Data Publisher as bellow;

DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611","admin","admin");

Here we provide the url of the “Thrift port” that is used to transfer events, username and password. In this mode the client authentication will be done in the secure way using ssl but the events will be transferred using an unsecured tcp channel. Therefore in the default mode for authentication, the client connects to the server with the url ssl://localhost:7711 where it uses the same hos that is used for sending events and calculate the “Thrift SSL port” as “Thrift port” (7611) + 100.

If the server is not configured in the default mode we can use the following api;

DataPublisher dataPublisher = new DataPublisher("ssl://localhost:7711","tcp://localhost:7611","admin","admin");

Here we provide the url of the “Thrift SSL port” for authentication,”Thrift port” for data transmission, username and password.

Note in this case the authentication url should have the Protocol “ssl” to enforce transport level security and it should point to the “Thrift SSL port”.

2. Secure data transfer

Here we can initialize the Data Publisher as;

    DataPublisher dataPublisher = new DataPublisher("ssl://localhost:7711", "admin", "admin");

Here we provide the url of the “Thrift SSL port”, username and password.In this mode the client authentication and data transfer will be done in a secure way using ssl. Therefore the client connect to the server with the url ssl://localhost:7711 where it uses the “Thrift SSL port” for both authentication and data transfer.

This is similar to using;

DataPublisher dataPublisher = new  DataPublisher("ssl://localhost:7711","ssl://localhost:7711", 
                                                 "admin", "admin");
For cases where we need many Data Publishers in the same JVM.

There might be scenarios where we need more than one Data Publisher in the same JVM. This can occur in cases where we need to send events to various data receiving endpoints or when there are different users trying to send event.

It this case to optimize performance we have to use many data publishers at the same time, through this we can maintain the connections to various endpoints and avoid the overhead of the initial handshake.Here, first we need to initialize the Agent and then pass the same agent to all the Data Publishers.


    Agent agent= new Agent();
    DataPublisher dataPublisher1 = new DataPublisher("tcp://localhost:7611", "admin", "admin",agent);
    DataPublisher dataPublisher2 = new DataPublisher("tcp://localhost:7612", "John", "password",agent);

Defining Streams

Before sending events we need to define the stream definition to send data. Data Bridge can use various Stream Definition Stores and according to the stream definition store the frequency of defining the streams varies. If it uses persistence stream definition store like Cassandra Stream Definition store we only need to define the stream ones and can use that later. But if its using an In Memory Stream Definition store then for each server restart we need to define the stream.

Here I explain how we can define the stream from the client.

1. Defining the stream definition as a JSON


    String streamId = dataPublisher.defineStream("{" +
                                                 " 'name':'org.wso2.esb.MediatorStatistics'," +
                                                 " 'version':'2.3.0'," +
                                                 " 'nickName': 'Stock Quote Information'," +
                                                 " 'description': 'Some Desc'," +
                                                 " 'tags':['Stock', 'Mediator']," +
                                                 " 'metaData':[" +
                                                 "       {'name':'ipAdd','type':'STRING'}" +
                                                 " ]," +
                                                 " 'correlationData':[" +
                                                 "       {'name':'correlationId','type':'STRING'}" +
                                                 " ]," +
                                                 " 'payloadData':[" +
                                                 "       {'name':'symbol','type':'STRING'}," +
                                                 "       {'name':'price','type':'DOUBLE'}," +
                                                 "       {'name':'volume','type':'INT'}," +
                                                 "       {'name':'max','type':'DOUBLE'}," +
                                                 "       {'name':'min','type':'Double'}" +
                                                 " ]" +

Here the user should provide a unique stream name and version. In addition user can provide a nickname to identify the stream, description and it also allows tags.

When defining the streams like in SQL tables we also need to define the attributes (the name and their type) that will be sent via the stream. Here the attribute types can be one of; String, Int, Long, Double, Float, and Bool

Data Publisher also facilitate the user to manage meta, correlation and payload data separately. This is supported by grouping the attributes into three predefined logical partitions such as metaData, correlationData and payloadData.

When defining streams the dataPublisher.defineStream() method always returns a streamId and this stream id must be later used when sending events to the server.

2. Defining the stream definition as a POJO Object


    String streamId = dataPublisher.defineStream(streamDefinition);

This is similar to the above method but here we need to populate the StreamDefinition Object and pass it to the DataPublisher.

Finding Stream Id

If the stream is already defined the client can get the streamId from the server by calling the dataPublisher.findStream(“streamName”,”streamVersion”) method.

Redefining the same stream

In any case if we redefine the same stream, the server will check the stream name, stream version and the meta, correlation and payload data attributes. If they exactly match an earlier definition it will return the same streamId that was defined before. If any of the meta, correlation or payload data attributes differ from the earlier definition it throws a DifferentStreamDefinitionAlreadyDefinedException.

Versioning streams

If we need to add or remove certain attributes we can do the changes to the attributes and define the stream with the different version and can still have the same stream name. In this case the generated streamId will be different from the earlier one.

Sending Events

The events sent will have the following format

Event format

StreamId - This is a string value that uniquely represent the Stream, this StreamId can be obtained by the dataPublisher.findStream() or the dataPublisher.defineStream() methods.

TimeStamp - This is a long value and this is optional. If the TimeStamp value is omitted the server it expected to populate the TimeStamp value with event arrival time at the backend server (CEP/BAM)

MetaData - This is an Object array that contains the meta data related to the event

CorrelationData - This is an Object array that contains the correlation data related to the event

PayloadData - This is an Object array that contains the payload data related to the event

We can use the following method to send events;

    dataPublisher.publish(streamId, timeStamp, 
                          new Object[]{""}, new Object[]{"AFD5"}, new Object[]{"IBM",96.8,300,120.6,70.4});

Note, here its important for the user to make sure that they are sending events according to the definitions they have already defined and you always pass data arrays in the order of meta, correlation and payload . This is because during the data transfer in order to achieve higher performance the Data Publisher and Data Bridge are not validating the events against their definitions. Therefore if we send events that do not adhere to there stream definitions they might end up throwing exceptions or passing garbage values to backend servers. Further Data Publisher also has the ability to automatically reconnect when its server session has expired. In this case it will try to reconnect three times before failing.

Writing A Sample Client

We need to have the following jars in the classpath for the Sample Data Publisher that we are going to build;

These can be found at the CARBON_HOME/repository/component/plugins folder of WSO2BAM or WSO2CEP.

    //The initialization and Stream Definition has to be called at the at the connection
    //initialization stage and we need to reuse the streamId and dataPublisher when sending events
    DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin");
    String streamId1 = dataPublisher.defineStream("{" +
                                                  " 'name':'org.wso2.esb.MediatorStatistics'," +
                                                  " 'version':'2.3.0'," +
                                                  " 'nickName': 'Stock Quote Information'," +
                                                  " 'description': 'Some Desc'," +
                                                  " 'metaData':[" +
                                                  "           {'name':'ipAdd','type':'STRING'}" +
                                                  " ]," +
                                                  " 'payloadData':[" +
                                                  "           {'name':'symbol','type':'STRING'}," +
                                                  "           {'name':'price','type':'DOUBLE'}," +
                                                  "           {'name':'volume','type':'INT'}," +
                                                  "           {'name':'max','type':'DOUBLE'}," +
                                                  "           {'name':'min','type':'Double'}" +
                                                  " ]" +
    //In this case correlation data is null
    dataPublisher.publish(streamId1, new Object[]{""}, null, new Object[]{"IBM", 96.8, 300, 120.6, 70.4});
    //Only call this before shutting down the client

In most deployments we will need to send several events for a certain time period. In this case its advisable to initialize the DataPublisher and to call the dataPublisher.defineStream() only once in the init() method, then publish all the data, and finally call the dataPublisher.stop() method only when we are shutting down the client.

The contains a server for testing purposes and the contains a sample data publisher that can publish data to the test server


Data bridge is a component that can be used to simultaneously publish events to both WSO2CEP and WSO2BAM. One of its major feature is its Thrift transport which is designed specifically for providing higher throughput and efficiency. Though we can simply instantiate the Data Publisher and publish data to a Data Receiver endpoint, when it comes to sending data to multiple Data Receiver endpoints it is a must to share the Agent with multiple Data Publishers to achieve higher performance.


S. Suhothayan, Software Engineer, WSO2 Inc.

About Author

  • Sriskandarajah Suhothayan
  • Associate Director / Architect
  • WSO2