WSO2 CEP - User Guide

[Documentation Index]

WSO2 Complex Event Processing Server (CEP) User Guide

The goal of this guide is to provide guidelines to be followed in order to get familiar with complex event processing server UI and the procedure of creating and testing a CEP bucket.

Contents

Introduction to CEP

This gives a brief introduction of how complex event processing server can be used in processing events.

Component Architecture

CEP Event Processing Diagram

As in the Diagram CEP Server consists of four main components

CEP Bucket

CEP Bucket is an instance of CEP Engine and it is the core of the Complex event processing server. Total processing on received events and triggering of new events happen in this engine.

Carbon Broker

As explained in the name, this is a broker between event server and the CEP Engine. There are three types of brokers which are Local,WS Event and JMS . If the user uses WS Event broker to receive and fire events, it is essential to use a broker which having it's type as WS Event and if the user uses QPid broker, it is essential to use JMS Type broker.

Broker Manager

This component is used to create the carbon broker. When create a broker with this component, it is needed to provide the type of the broker and other essential properties.

Qpid Broker

This component is embedded to the CEP Server and if the carbon broker type is JMS, User can use this component to receive and publish events.

CEP Bucket

The base of WSO2 Complex Event Processing Server (CEP) is BUCKET. A bucket can be considered as an instance of a CEP back end runtime. For each and every bucket created in CEP Server, it creates an instance of the back end runtime. A bucket basically consists of three main attributes

Engine Provider

This is the back end runtime engine that process events.There are two types of back end runtime engines in CEP Server, They are

User is possible to select the runtime as his/her preference. The only thing need to consider when selecting the runtime engine is Query, Since query structure differs with the back end runtime used.

Inputs

Complex Event Processing Server is triggered by events. When some event source publish event or stream of events , there should be a topic to publish the events to be received by the CEP Server. By defining inputs, we are defining a topic where an event source can publish to. An input consists of five main elements

Query

Filtering and triggering new events on received events is the basic idea of the CEP Server. To filter events there should be some criteria specified in the CEP Engine. This criteria which is needed to filter events is provided with the query of the CEP Bucket. There can be one or more queries and user can decide the number of queries required to perform the expected task.

Query consists of three elements.

Creating Bucket

In Manage Menu Section , you will find the menu "CEP Buckets" and you will find all the available buckets, if you click on List menu and You will be able to create a new bucket , when you click on "Add" menu item.

A bucket can have three states

In any of the above states, if you click on save button , It will save to back end and user will be able to see the created bucket in the bucket list. Apart from that the created bucket will be saved to the Configuration registry under "CEPBuckets" tree. If the user save the bucket in state 1 or state 2, he/she is able to add Inputs , Queries to the bucket at later time.

Stock Quote Sample

Description:

This sample is based on Stock Quotes in any Stock Market. In stock exchange people sell and buy shares of various companies. If a company sell its shares in a stock exchange that particular company is recognized with a particular symbol in that stock exchange. Some of the sample symbols are "MSFT- Microsoft Cooperation , GOOG - Google, ORCL- Oracle , YHOO - Yahoo". When execute trades on a particular symbol it always has some details like bellow;

People who are interested in Stock Market are keen on the last traded price of various symbols and they perform their trades by considering last traded values of symbols. So In this example, We are filtering out the stock quotes , if its last traded value is greater than 100.

In WsO2 CEP, There are six possible ways to implement this sample, Here we are explaining a simple example of stock quotes.

Fusion Back End Runtime With Local Broker

Stock Quote sample is based on Stock Quotes of any Stock Exchange. Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use local Broker, it is needed to create a broker with type local.

Creating a broker with type 'local'

CEP Create Local Broker


Step 1 :
    Start CEP Server and log in as admin

Step 2 :
    In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that

Step 3 :
    You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a local broker

    Broker Name : localBroker
    Broker Type : local

    Finally click on Add Broker button and you will get the added broker to the list of available brokers.

Create bucket with Fusion

Use the "Add" menu in CEP Buckets to add buckets

State 1 - Basic Information

Bucket Name (Name of the bucket) 		: StockQuoteAnalyzer
Description (Description about the bucket) 	: This bucket analyzes stock quotes and trigger an event if the last
						  traded amount is greater than 100.
Engine Provider(CEP Runtime engine to be used)  : DroolsFusionCEPRuntime  [Choose from the drop down]

CEP Create Bucket -State 1

State 2 - Defining Inputs

Defining Inputs : Click on Add Input link and it will provide a form to define inputs. You can define the sample input by entering following values


Topic( topic to events be received) 			: AllStockQuotes
Broker Name (Broker to be used)     			: localBroker
Mapping Stream (Name of the event stream)		: allStockQuotes

-Defining xpaths (Defining the )
  XpathDefinition (xpath prefixes and NS)-Prefix	: quotedata
  XpathDefinition (xpath prefixes and NS)-Namespace	: http://ws.cdyne.com/

**Note : Click on add button to add the defined xpath and it will be appeared in the xpath definitions table once it added

-Defining input properties (these properties will be extracted from the received xml event and fed to the CEP engine)
  Property  -	Name	: symbol
		xpath 	: //quotedata:StockQuoteEvent/quotedata:StockSymbol
		type	: java.lang.String [Choose from the drop down ]

**Note : Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added

  Property	Name	: price
		xpath	: //quotedata:StockQuoteEvent/quotedata:LastTradeAmount
		type    : java.lang.Double [Choose from the drop down]

**Note : Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added

*** Note : After filling all required fields click on the add button to add Input to the Bucket. Once you clicked it will disappear the input form and added input will be appeared in Inputs table.

CEP Add Input

State 3 - Defining Queries

Query Name (To identify the query)			: ConditionalStocksDetector

**Note : We have provided two ways to provide a query expression.

Expression (Can be type inline or can get from registry):
							package org.wso2.carbon.cep.fusion;
							import java.util.HashMap;
							global org.wso2.carbon.cep.fusion.listener.FusionEventListener fusionListener;
							declare HashMap
							@role( event )
							end
							rule Invoke_Stock_Quotes
							when
							    $stockQuote : HashMap($symbol : this["symbol"], $stockPrice : this["price"], this["picked"] != "true") over
								window:time(2m) from entry-point "allStockQuotes";
							    eval((Double)$stockPrice > 100);
							then
							    $stockQuote.put("picked","true");
							    update($stockQuote);
							    HashMap $fastMovingStock = new HashMap();
							    $fastMovingStock.put("price",$stockPrice);
							    $fastMovingStock.put("symbol",$symbol);
							    fusionListener.onEvent($fastMovingStock);
							end


Defining the Output :
         Here we are defining the topic which filtered events are published and the structure of the output xml event

    Topic(topic which filtered event published)		: ConditionSatisfyingStockQuotes
    BrokerName (Broker to be used)			        : localBroker

**Note : There are two possible ways to define output event

    xml Mapping (Output XML Element- Parameters which are needed to be replaced with filtered vales are given in curly brackets)
							:	<quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
								xmlns:xsd="http://www.w3.org/2001/XMLSchema"
								xmlns:quotedata="http://ws.cdyne.com/">
									<quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
									<quotedata:LastTradeAmount>{price}</quotedata:LastTradeAmount>
								</quotedata:StockQuoteDataEvent>

*** Note : After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table.

**** Note : As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

CEP Add Query

Invoking Deployed Bucket

When the bucket is successfully deployed, an axis2 service will be automatically created with the name you given for the broker name. In this case the name of the service will be 'localBrokerService'. This service can be used to send the input event stream to CEP engine.

Defining output subscriber

Standalone Mode

When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. So before send events to the engine we need to have an axis2 service deployed which prints received string to the console. Then create a subscription to the output topic , with providing the URL of the axis2 service as the subscription URL.

You can find the source sample axis2 service "FastMovingStockQuoteService" in the location : /samples/services/FastMovingStockQuoteReceiverService

go in to this folder and execute the following command : "ant"

When you done this , it will place "FastMovingStockQuoteService.aar" file in the location:

/repository/deployment/server/axis2services/

you will be able to see the axis2 service in the services list.

Now create a topic with the same name you provided in output and subscribe to it by providing the event sink URL

Stratos Mode

In stratos mode , All the topic creations are same other than the "Event Sink URL"

Here the Event Sink URL should be to the message box you created above.

this url prepares as follows : sqs://loggedInUserName+/+MessageBoxName

If my username is "foo" and my message box name is "bar" then the event sink URL should be as follows;

sqs://foo/bar

So provide the EventSinkURL appropriately.

Specifically , if you have logged in as shammi@brs.com and my messagebox name is "FastMovingStocks" , then Event Sink URL would be;

sqs://shammi/FastMovingStocks

You can test whether this is working fine by publishing a XML element to publish section and check the message box.

Creating topic and subscribing

In order to subscribe, you need to create a topic.

        Step 1: Click on "Add" menu item under "Topics" Menu in Manage section of the left panel

        

Add Topic

Step 2: Specify the topic name in the provided text box , in this case topic name is : "ConditionSatisfyingStockQuotes" and click on 'Add Topic' button. This will add the topic to the server and you will be directed to the Topic Browser page.

Topic Browser

Step 2: Once you click on the topic in topic browser page you will be able see four links as in the above image. Click in the subscribe link and you will be directed to Subscribe page.

Subscribe

Step 3: Create subscription with following details, Once you click on the "Subscribe" button, you will be directed to the Topic Browser page. topic : ConditionSatisfyingStockQuotes (Output topic) subscription mode : Topic only subscription URL : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement expiration Time : select a future date from calender Step 4 : You can verify whether you have correctly subscribe to the topic by click on "Details" link of that topic in topic browser page. Once you click on that , you will be directed to the "topic details " page and there you will find all the subscriptions for that topic and its children (if exists) and permission on that topic. Apart from that with the publish section, you can publish a test xml message to that topic and check whether it is received to you subscription URL.

Topic Details

Sending events to CEP Engine

There are two possible ways that can be used to send input events using this axis2 service.

Sending events directly to the axis2 service

Following class can be used to send events to the axis service;

To run this class it is required to have Axis2 and Axiom binary distribution's library files.

import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.ServiceClient;

import javax.xml.stream.XMLStreamException;

     public class SampleClient {
    public static void main(String[] args) {
        ServiceClient serviceClient = null;
        try {
            serviceClient = new ServiceClient();
            serviceClient.setTargetEPR(new EndpointReference("http://localhost:9763/services/localBrokerService/AllStockQuotes"));
            } catch (AxisFault axisFault) {
            axisFault.printStackTrace();
        }
        if (serviceClient != null) {
            String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                    "                    <quotedata:StockQuoteEvent>\n" +
                    "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                    "              <quotedata:LastTradeAmount>99.55</quotedata:LastTradeAmount>\n" +
                    "              <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                    "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                    "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                    "              <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                    "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                    "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                    "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                    "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                    "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                    "              <quotedata:PE>10.88</quotedata:PE>\n" +
                    "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                    "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                    "                    </quotedata:StockQuoteEvent>\n" +
                    "                </quotedata:AllStockQuoteStream>";

            String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                    "                    <quotedata:StockQuoteEvent>\n" +
                    "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                    "              <quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>\n" +
                    "              <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                    "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                    "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                    "              <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                    "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                    "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                    "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                    "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                    "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                    "              <quotedata:PE>10.88</quotedata:PE>\n" +
                    "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                    "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                    "                    </quotedata:StockQuoteEvent>\n" +
                    "                </quotedata:AllStockQuoteStream>";

            String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
                    "                    <quotedata:StockQuoteEvent>\n" +
                    "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
                    "              <quotedata:LastTradeAmount>99.98</quotedata:LastTradeAmount>\n" +
                    "              <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
                    "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
                    "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
                    "              <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
                    "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
                    "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
                    "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
                    "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
                    "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
                    "              <quotedata:PE>10.88</quotedata:PE>\n" +
                    "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
                    "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
                    "                    </quotedata:StockQuoteEvent>\n" +
                    "                </quotedata:AllStockQuoteStream>";


            OMElement omElement1 = null;
            OMElement omElement2 = null;
            OMElement omElement3 = null;
            try {
                omElement1 = AXIOMUtil.stringToOM(xmlElement1);
                omElement2 = AXIOMUtil.stringToOM(xmlElement2);
                omElement3 = AXIOMUtil.stringToOM(xmlElement3);
                serviceClient.fireAndForget(omElement1);
                serviceClient.fireAndForget(omElement2);
                serviceClient.fireAndForget(omElement3);
            } catch (XMLStreamException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (AxisFault axisFault) {
                axisFault.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }


        }

    }
}

When you run this class, if you have subscribed correctly to the output topic of the query, you will be able to see the filtered events in the console as bellow.

<xmlmapping><quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                            <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
			    <quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>
                            <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
                        </quotedata:StockQuoteDataEvent></xmlmapping>

Subscribe to a topic with the axis2 service URL and send events to that topic via a client or Publish feature

Invoking the deployed bucket via Publish feature can be tested as bellow

In order to send a message to a topic via Publish feature , you need to create a topic and a subscription

Step 01: Create a Topic

Step 02: Create subscription to the created topic

Step 03: Publishing message to the topic

Sample Inputs : Use following xml elements ONE by ONE and invoke the service,

<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockQuoteEvent>
              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
              <quotedata:LastTradeAmount>99.55</quotedata:LastTradeAmount>
              <quotedata:StockChange>0.05</quotedata:StockChange>
              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>
              <quotedata:DayHigh>25.46</quotedata:DayHigh>
              <quotedata:DayLow>25.01</quotedata:DayLow>
              <quotedata:StockVolume>20452658</quotedata:StockVolume>
              <quotedata:PrevCls>25.31</quotedata:PrevCls>
              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>
              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
              <quotedata:PE>10.88</quotedata:PE>
              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
              <quotedata:QuoteError>false</quotedata:QuoteError>
                    </quotedata:StockQuoteEvent>
                </quotedata:AllStockQuoteStream>


<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockQuoteEvent>
              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
              <quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>
              <quotedata:StockChange>0.05</quotedata:StockChange>
              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>
              <quotedata:DayHigh>25.46</quotedata:DayHigh>
              <quotedata:DayLow>25.01</quotedata:DayLow>
              <quotedata:StockVolume>20452658</quotedata:StockVolume>
              <quotedata:PrevCls>25.31</quotedata:PrevCls>
              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>
              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
              <quotedata:PE>10.88</quotedata:PE>
              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
              <quotedata:QuoteError>false</quotedata:QuoteError>
                    </quotedata:StockQuoteEvent>
                </quotedata:AllStockQuoteStream>

<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockQuoteEvent>
              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
              <quotedata:LastTradeAmount>99.98</quotedata:LastTradeAmount>
              <quotedata:StockChange>0.05</quotedata:StockChange>
              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>
              <quotedata:DayHigh>25.46</quotedata:DayHigh>
              <quotedata:DayLow>25.01</quotedata:DayLow>
              <quotedata:StockVolume>20452658</quotedata:StockVolume>
              <quotedata:PrevCls>25.31</quotedata:PrevCls>
              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>
              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
              <quotedata:PE>10.88</quotedata:PE>
              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
              <quotedata:QuoteError>false</quotedata:QuoteError>
                    </quotedata:StockQuoteEvent>
                </quotedata:AllStockQuoteStream>

CEP Subscribe and Publish to a topic

You will be able to get the same output in the console as in previous test method

References

EsperTM is a registered trademark of EsperTech.