2013/11/14
14 Nov, 2013

Writing Custom Event Adaptor Types for WSO2 Complex Event Processor 3.0.0

  • Mohanadarshan Vivekanandalingam
  • Technical Lead - WSO2

Applies to

WSO2 CEP 3.0.0

Table of contents 

  1. Event adaptors in WSO2 CEP 3.0.0

  2. Importance of an event adaptor in WSO2 CEP and other WSO2 products

  3. Architecture of event adaptors

  4. Types of event adaptors

  5. Event adaptor implementations and supported message types

  6. Creating a custom event adaptor type

  7. How to deploy and undeploy a custom event adaptor 

  8. Summary

Event adaptors in WSO2 CEP 3.0.0

WSO2 CEP is an enterprise real-time event processing server that can monitor and detect  pre-defined patterns from incoming events. It is built to be extremely high performing and massively scalable, and it offers significant time saving and affordable acquisition. WSO2 CEP 3.0.0 is the complete rewrite of WSO2 CEP.

 

The importance of an event adaptor in WSO2 CEP and other WSO2 products

As mentioned above, an event adaptor is the mediation component between the WSO2 CEP engine and the event-source or event-sink; it provides connection between these two ends by receiving and sending events. Event adaptors are not specifically designed for WSO2 CEP, and can also be used by other WSO2 products like WSO2 BAM as a part of the event receiver and event notifier. 
 
An event adaptor is written in a more flexible manner as an OSGI deployable unit, and thus it is easier to deploy and undeploy. After a custom event adaptor implementation is completed, it needs to be exposed as an OSGI service. Thereafter, it will be tracked by the event adaptor Tracker service, which is developed based on the OSGI white board pattern. The architectural diagram below illustrates how an event adaptor type module works internally in WSO2 CEP.

Types of event adaptors

There are two main types of event adaptors available. They are
1) Input event adaptor
2) Output event adaptor

Input Event Adaptor – Input event adaptors are used to receive events from the event source. After receiving the events, it forwards events to subscribed listeners.

Output Event Adaptor – Output event adaptors are used to send notifications (events) from the WSO2 CEP engine to event sinks.

Event adaptor implementations and supported message types 

 
Input Output
1
wso2Event
wso2Event
2
email
email
3
jms
jms 
4
ws-event 
ws-event
5
ws-event-local 
ws-event-local
6
 
sms
7
 
http
8
 
Cassandra 
9
 
MySQL

There are several event adaptor type implementations available in WSO2 CEP 3.0.0 by default. These event adaptor types are separate OSGI components, and are easily adoptable within any WSO2 products.

The table below describes the message types supported by these event adaptor types. They have been described by using a matrix table below for greater clarity.  

Input Event Adaptor
XML
JSON
WSO2Event
Text
Map
email
*
*

*

jms
*



*
ws-event
*




ws-event-local
*




wso2event


*


Output Event Adaptor
XML
JSON
WSO2Event
Text
Map
email
*
*

*

jms
*
*

*
*
ws-event
*




ws-event-local
*




wso2event


*


sms
 
 

*

http
*
*

*

Here Cassandra and MySQL event adaptors are used for persisting data. These adaptors dump the events that come from the WSO2 CEP engine into the relevant databases. These data can be used for any future processing depending on the use case of the user.

Creating a custom event adaptor type

As mentioned before, each event adaptor type is a separate OSGI bundle that can be easily deployable. Moreover, it is very easy to write a custom event adaptor type and deploy it as an OSGI bundle. Let's see now how we can write a custom event adaptor type. 

Custom input event adaptor type

First, let's look at how to write an input event adaptor type called “testIn”. In order to write a custom input event adaptor type, we have to extend "org.wso2.carbon.event.input.adaptor.core.AbstractInputEventAdaptor" and "org.wso2.carbon.event.input.adaptor.core.InputEventAdaptorFactory" that are available in the jar “org.wso2.carbon.event.input.adaptor.core_1.X.X.jar”. Here, the implementation of the "org.wso2.carbon.event.input.adaptor.core.AbstractInputEventAdaptor" will contain the input event adaptor logic that will be used to receive events, and the implementation of "org.wso2.carbon.event.input.adaptor.core.InputEventAdaptorFactory" will be used as the factory to create your appropriate input event adaptor type.

org.wso2.carbon.event.input.adaptor.core_1.X.X.jar is available in the WSO2 public Maven Repository, which you can add to your project pom.xml as follows or you can find the org.wso2.carbon.event.input.adaptor.core_1.X.X.jar in the <CEP_HOME>/repository/components/plugins/ and import it into your maven project classpath. You can simply use the sample project "testIn.zip" that can be found in the attachments section in this article. 

<repositories>
     <repository>
          <id>wso2-maven2-repository</id>
          <name>WSO2 Maven2 Repository</name>
     </repository>
</repositories>

When you extend the “AbstractInputEventAdaptor”, TestInEventAdaptorType needs to implement the following methods:

public class TestInEventAdaptorType extends AbstractInputEventAdaptor {
 
    /**
     * returns the name of the input event adaptor type
     *
     * @return event adaptor type name
     */
    @Override
    protected String getName() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * to get the information regarding supported message types event adaptor
     *
     * @return List of supported message Types
     */
    @Override
    protected List<String> getSupportedInputMessageTypes() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * any initialization can be done in this method
     */
    @Override
    protected void init() {
        //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * the information regarding the adaptor related properties of a specific event adaptor type
     *
     * @return List of properties related to event adaptor
     */
    @Override
    protected List<Property> getInputAdaptorProperties() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * the information regarding the message related properties of a specific event adaptor type
     *
     * @return List of properties related to a message type
     */
    @Override
    protected List<Property> getInputMessageProperties() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * subscribe to the connection specified in the input event adaptor configuration.
     *
     * @param inputEventAdaptorMessageConfiguration
     *                                      - configuration related to message
     * @param inputEventAdaptorListener - event adaptor type will invoke this when it receive events
     * @param inputEventAdaptorConfiguration
     *                                      - configuration related to event adaptor
     * @param axisConfiguration
     * @return
     */
    @Override
    public String subscribe(
            InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration,
            InputEventAdaptorListener inputEventAdaptorListener,
            InputEventAdaptorConfiguration inputEventAdaptorConfiguration,
            AxisConfiguration axisConfiguration) {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * this method un-subscribes the subscription from the event adaptor.
     *
     * @param inputEventAdaptorMessageConfiguration
     *
     * @param inputEventAdaptorConfiguration
     *
     * @param axisConfiguration
     * @param subscriptionId
     */
    @Override
    public void unsubscribe(
            InputEventAdaptorMessageConfiguration inputEventAdaptorMessageConfiguration,
            InputEventAdaptorConfiguration inputEventAdaptorConfiguration,
            AxisConfiguration axisConfiguration, String subscriptionId) {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}

Here subscribe() method will be triggered when there is a subscription found for this event adaptor type. Here you need to create an InputEventAdaptorListener object and listen for events; when there is an event received, then you can call the onEvent( ) method in the inputEventAdaptorListener object. The events that are passed through the onEvent() method can be any type as mentioned above (xml, json, wso2event, text & map).

Now, we need create a factory class by implementing the “InputEventAdaptorFactory” interface. Here, you need to implement the getEventAdaptor() method as shown below.  

public class TestInEventAdaptorFactory implements InputEventAdaptorFactory {
 
    /**
     * return the input event adaptor type object
     *
     * @return
     */
    public AbstractInputEventAdaptor getEventAdaptor() {
        return new TestInEventAdaptorType();
    }
 

This method will return an object of the event adaptor type that we have implemented. Now we'll look at the important part of custom event adaptor type implementation. Here, after completing the above-mentioned steps, it is necessary to expose the event adaptor object as an OSGI service. Then only the input event adaptor core can identify it as a new implementation and add into the input event adaptor type list. For this, we use the OSGI white board pattern (for more information about whiteboard pattern please refer to https://mohanadarshan.wordpress.com/2013/06/23/osgi-white-board-pattern-with-a-sample/)

Here's how you can expose the custom event adaptor implementation as an OSGI service. When exposing the service, it needs to expose as “InputEventAdaptorFactory” type. 

/**
 * @scr.component name="input.testInEventAdaptorService.component" immediate="true"
 */
 
public class TestInEventAdaptorServiceDS {
 
    private static final Log log = LogFactory.getLog(TestInEventAdaptorServiceDS.class);
 
    protected void activate(ComponentContext context) {
 
        try {
            InputEventAdaptorFactory testInEventAdaptorFactory = new TestInEventAdaptorFactory();
            context.getBundleContext().registerService(InputEventAdaptorFactory.class.getName(), testInEventAdaptorFactory, null);
            log.info("Successfully deployed the input TestIn event adaptor service");
        } catch (RuntimeException e) {
            log.error("Can not create the input TestIn event adaptor service ", e);
        }
    }
}

Here we create an object of the custom input event adaptor “testIn” and register it in the bundle context and expose as an OSGI service. 

Custom output event adaptor type

First, we'll look at how to write an output event adaptor type called “testOut”. In order to write a custom output event adaptor we have to extend "org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor" and "org.wso2.carbon.event.output.adaptor.core.OutputEventAdaptorFactory" that are available in the jar “org.wso2.carbon.event.output.adaptor.core_1.X.X.jar”. Here the implementation of the "org.wso2.carbon.event.output.adaptor.core.AbstractOutputEventAdaptor" will contain the output event adaptor logic that will be used to send events, and the implementation of "org.wso2.carbon.event.output.adaptor.core.OutputEventAdaptorFactory" will be used as the factory to create your appropriate output event adaptor type.

org.wso2.carbon.event.output.adaptor.core_1.X.X.jar is available in the WSO2 public Maven Repository, which you can add to your project pom.xml as shown below or you can find the org.wso2.carbon.event.output.adaptor.core_1.X.X.jar in the <CEP_HOME>/repository/components/plugins/ and import it into your maven project classpath. You can simply use the sample project "testOut.zip" that can be found in the attachments section at end of this article.

<repositories>
     <repository>
       <id>wso2-maven2-repository</id>
       <name>WSO2 Maven2 Repository</name>
     </repository>
</repositories>

When you extend the “AbstractOutputEventAdaptor”, TestOutEventAdaptorType needs to implement the following methods:

 

public class TestOutEventAdaptorType extends AbstractOutputEventAdaptor {
 
    /**
     * returns the name of the output event adaptor type
     *
     * @return event adaptor name
     */
    @Override
    protected String getName() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * To get the information regarding supported message types event adaptor
     *
     * @return List of supported output message types
     */
    @Override
    protected List<String> getSupportedOutputMessageTypes() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * any initialization can be done in this method
     */
    @Override
    protected void init() {
        //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * the information regarding the adaptor related properties of a specific event adaptor type
     *
     * @return List of properties related to output event adaptor
     */
    @Override
    protected List<Property> getOutputAdaptorProperties() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * the information regarding the message related properties of a specific event adaptor type
     *
     * @return List of properties related to message type
     */
    @Override
    protected List<Property> getOutputMessageProperties() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * this method is used to publish events
     *
     * @param outputEventAdaptorMessageConfiguration
     *                - message related configuration that need to publish messages
     * @param message - message to send
     * @param outputEventAdaptorConfiguration
     *
     */
    @Override
    public void publish(
            OutputEventAdaptorMessageConfiguration outputEventAdaptorMessageConfiguration,
            Object message,
            OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) {
        //To change body of implemented methods use File | Settings | File Templates.
    }
 
    /**
     * this method is used to check the connectivity with receiver end.
     *
     * @param outputEventAdaptorConfiguration
     *
     */
    @Override
    public void testConnection(
            OutputEventAdaptorConfiguration outputEventAdaptorConfiguration, int tenantId) {
        //To change body of implemented methods use File | Settings | File Templates.
    }
}
 

Here publish() method will be called when the CEP sends an output; your implementation to publish the events needs to go under the publish() method. The input parameter "message" of the publish() method can be either of text, map, xml, wso2event or json. After implementing the custom output event adaptor logic, we need create a factory class by implementing the “OutputEventAdaptorFactory” interface. Here you need to implement the getEventAdaptor() method as shown below.

 

public class TestOutEventAdaptorFactory implements OutputEventAdaptorFactory {
 
    /**
     * return the event output adaptor type object
     *
     * @return
     */
    public AbstractOutputEventAdaptor getEventAdaptor() {
        return new TestOutEventAdaptorFactory();
    }

Similarly, as mentioned for input event adaptor, here too we need to expose the custom output event adaptor type implementation as shown below. Below testOut event adaptor is exposed as an OSGI service.

 

/**
 * @scr.component name="output.testOutEventAdaptorService.component" immediate="true"
 */
 
public class TestOutEventAdaptorServiceDS {
 
    private static final Log log = LogFactory.getLog(TestOutEventAdaptorServiceDS.class);
 
    protected void activate(ComponentContext context) {
 
        try {
            OutputEventAdaptorFactory testOutEventAdaptorFactory = new TestOutEventAdaptorFactory();
            context.getBundleContext().registerService(OutputEventAdaptorFactory.class.getName(), testOutEventAdaptorFactory, null);
            log.info("Successfully deployed the output TestOut event adaptor service");
        } catch (RuntimeException e) {
            log.error("Can not create the output TestOut event adaptor service ", e);
        }
    }
}

Here as we are exposing the created custom output event adaptor as an OSGI service. There is an event adaptor type Tracker in the event adaptor core component to trace newly deployed event adaptor type implementations. 

How to deploy and undeploy a custom event adaptor

Deploying and undeploying a custom event adaptor type is very simple in WSO2 CEP 3.0.0. Simply implement the custom event adaptor type (input or output) as shown above (use the sample projects in the attachment), build the project and copy the created OSGI bundle that is inside the "target" folder into the <CEP_HOME>/repository/components/dropins. Then start the CEP server, and afterwards you can see the newly created event adaptor type services in the server startup logs.

 

Here, the newly created custom event adaptor type will also be visible in the UI with necessary properties. The figure below shows how newly created "testIn" custom input event adaptor type is visible in the UI.  Now you can create several instances of this event adaptor type.

 

Summary

As described in detail, an event adaptor, which is the mediation component between a CEP engine and the event-source or event-sink, provides connection between these two ends by receiving and sending events. Even though these are not specifically designed for WSO2 CEP, they also can be used by other WSO2 products like WSO2 BAM as a part of the event receiver and event notifier. Further to our detailed overview of event adaptors, the usage of these adaptors, and guidelines on how to write a custom event adaptor type and deploy it, we also recommend reviewing our online WSO2 CEP 3.0.0 documentation for more details about event adaptors and WSO2 CEP 3.0.0. Moreover, refer to the project files provided in the attachment below to create your own custom event adaptor types.

 

 

About Author

  • Mohanadarshan Vivekanandalingam
  • Technical Lead
  • WSO2