The goal of this guide is to provide guidelines to be followed in order to get familiar with complex event processing server UI and the procedure of creating and testing a CEP bucket.
This gives a brief introduction of how complex event processing server can be used in processing events.

As in the Diagram CEP Server consists of four main components
CEP Bucket is an instance of CEP Engine and it is the core of the Complex event processing server. Total processing on received events and triggering of new events happen in this engine.
As explained in the name, this is a broker between event server and the CEP Engine. There are three types of brokers which are Local,WS Event and JMS . If the user uses WS Event broker to receive and fire events, it is essential to use a broker which having it's type as WS Event and if the user uses QPid broker, it is essential to use JMS Type broker.
This component is used to create the carbon broker. When create a broker with this component, it is needed to provide the type of the broker and other essential properties.
This component is embedded to the CEP Server and if the carbon broker type is JMS, User can use this component to receive and publish events.
The base of WSO2 Complex Event Processing Server (CEP) is BUCKET. A bucket can be considered as an instance of a CEP back end runtime. For each and every bucket created in CEP Server, it creates an instance of the back end runtime. A bucket basically consists of three main attributes
This is the back end runtime engine that process events.There are two types of back end runtime engines in CEP Server, They are
User is possible to select the runtime as his/her preference. The only thing need to consider when selecting the runtime engine is Query, Since query structure differs with the back end runtime used.
Complex Event Processing Server is triggered by events. When some event source publish event or stream of events , there should be a topic to publish the events to be received by the CEP Server. By defining inputs, we are defining a topic where an event source can publish to. An input consists of five main elements
Filtering and triggering new events on received events is the basic idea of the CEP Server. To filter events there should be some criteria specified in the CEP Engine. This criteria which is needed to filter events is provided with the query of the CEP Bucket. There can be one or more queries and user can decide the number of queries required to perform the expected task.
Query consists of three elements.
In Manage Menu Section , you will find the menu "CEP Buckets" and you will find all the available buckets, if you click on List menu and You will be able to create a new bucket , when you click on "Add" menu item.
A bucket can have three states
In any of the above states, if you click on save button , It will save to back end and user will be able to see the created bucket in the bucket list. Apart from that the created bucket will be saved to the Configuration registry under "CEPBuckets" tree. If the user save the bucket in state 1 or state 2, he/she is able to add Inputs , Queries to the bucket at later time.
This sample is based on Stock Quotes in any Stock Market. In stock exchange people sell and buy shares of various companies. If a company sell its shares in a stock exchange that particular company is recognized with a particular symbol in that stock exchange. Some of the sample symbols are "MSFT- Microsoft Cooperation , GOOG - Google, ORCL- Oracle , YHOO - Yahoo". When execute trades on a particular symbol it always has some details like bellow;
People who are interested in Stock Market are keen on the last traded price of various symbols and they perform their trades by considering last traded values of symbols. So In this example, We are filtering out the stock quotes , if its last traded value is greater than 100.
In WsO2 CEP, There are six possible ways to implement this sample, Here we are explaining a simple example of stock quotes.
Stock Quote sample is based on Stock Quotes of any Stock Exchange. Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use local Broker, it is needed to create a broker with type local.

Step 1 :
Start CEP Server and log in as admin
Step 2 :
In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that
Step 3 :
You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a local broker
Broker Name : localBroker
Broker Type : local
Finally click on Add Broker button and you will get the added broker to the list of available brokers.
Use the "Add" menu in CEP Buckets to add buckets
Bucket Name (Name of the bucket) : StockQuoteAnalyzer Description (Description about the bucket) : This bucket analyzes stock quotes and trigger an event if the last traded amount is greater than 100. Engine Provider(CEP Runtime engine to be used) : DroolsFusionCEPRuntime [Choose from the drop down]

Defining Inputs : Click on Add Input link and it will provide a form to define inputs. You can define the sample input by entering following values Topic( topic to events be received) : AllStockQuotes Broker Name (Broker to be used) : localBroker Mapping Stream (Name of the event stream) : allStockQuotes -Defining xpaths (Defining the ) XpathDefinition (xpath prefixes and NS)-Prefix : quotedata XpathDefinition (xpath prefixes and NS)-Namespace : http://ws.cdyne.com/
**Note : Click on add button to add the defined xpath and it will be appeared in the xpath definitions table once it added
-Defining input properties (these properties will be extracted from the received xml event and fed to the CEP engine) Property - Name : symbol xpath : //quotedata:StockQuoteEvent/quotedata:StockSymbol type : java.lang.String [Choose from the drop down ]
**Note : Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added
Property Name : price xpath : //quotedata:StockQuoteEvent/quotedata:LastTradeAmount type : java.lang.Double [Choose from the drop down]
**Note : Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added
*** Note : After filling all required fields click on the add button to add Input to the Bucket. Once you clicked it will disappear the input form and added input will be appeared in Inputs table.

Query Name (To identify the query) : ConditionalStocksDetector
**Note : We have provided two ways to provide a query expression.
Expression (Can be type inline or can get from registry):
package org.wso2.carbon.cep.fusion;
import java.util.HashMap;
global org.wso2.carbon.cep.fusion.listener.FusionEventListener fusionListener;
declare HashMap
@role( event )
end
rule Invoke_Stock_Quotes
when
$stockQuote : HashMap($symbol : this["symbol"], $stockPrice : this["price"], this["picked"] != "true") over
window:time(2m) from entry-point "allStockQuotes";
eval((Double)$stockPrice > 100);
then
$stockQuote.put("picked","true");
update($stockQuote);
HashMap $fastMovingStock = new HashMap();
$fastMovingStock.put("price",$stockPrice);
$fastMovingStock.put("symbol",$symbol);
fusionListener.onEvent($fastMovingStock);
end
Defining the Output :
Here we are defining the topic which filtered events are published and the structure of the output xml event
Topic(topic which filtered event published) : ConditionSatisfyingStockQuotes
BrokerName (Broker to be used) : localBroker
**Note : There are two possible ways to define output event
xml Mapping (Output XML Element- Parameters which are needed to be replaced with filtered vales are given in curly brackets)
: <quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:quotedata="http://ws.cdyne.com/">
<quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
<quotedata:LastTradeAmount>{price}</quotedata:LastTradeAmount>
</quotedata:StockQuoteDataEvent>
*** Note : After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table.
**** Note : As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

When the bucket is successfully deployed, an axis2 service will be automatically created with the name you given for the broker name. In this case the name of the service will be 'localBrokerService'. This service can be used to send the input event stream to CEP engine.
When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. So before send events to the engine we need to have an axis2 service deployed which prints received string to the console. Then create a subscription to the output topic , with providing the URL of the axis2 service as the subscription URL.
You can find the source sample axis2 service "FastMovingStockQuoteService" in the location : /samples/services/FastMovingStockQuoteReceiverService
go in to this folder and execute the following command : "ant"
When you done this , it will place "FastMovingStockQuoteService.aar" file in the location:
/repository/deployment/server/axis2services/
you will be able to see the axis2 service in the services list.
Now create a topic with the same name you provided in output and subscribe to it by providing the event sink URL
In stratos mode , All the topic creations are same other than the "Event Sink URL"
Here the Event Sink URL should be to the message box you created above.
this url prepares as follows : sqs://loggedInUserName+/+MessageBoxName
If my username is "foo" and my message box name is "bar" then the event sink URL should be as follows;
sqs://foo/bar
So provide the EventSinkURL appropriately.
Specifically , if you have logged in as shammi@brs.com and my messagebox name is "FastMovingStocks" , then Event Sink URL would be;
sqs://shammi/FastMovingStocks
You can test whether this is working fine by publishing a XML element to publish section and check the message box.
In order to subscribe, you need to create a topic.
Step 1: Click on "Add" menu item under "Topics" Menu in Manage section of the left panel

Step 2: Specify the topic name in the provided text box , in this case topic name is : "ConditionSatisfyingStockQuotes"
and click on 'Add Topic' button. This will add the topic to the server and you will be directed to the
Topic Browser page.

Step 2: Once you click on the topic in topic browser page you will be able see four links as in the above image.
Click in the subscribe link and you will be directed to Subscribe page.

Step 3: Create subscription with following details, Once you click on the "Subscribe" button, you will be
directed to the Topic Browser page.
topic : ConditionSatisfyingStockQuotes (Output topic)
subscription mode : Topic only
subscription URL : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement
expiration Time : select a future date from calender
Step 4 : You can verify whether you have correctly subscribe to the topic by click on "Details" link of that topic
in topic browser page. Once you click on that , you will be directed to the "topic details " page and there
you will find all the subscriptions for that topic and its children (if exists) and permission on that topic.
Apart from that with the publish section, you can publish a test xml message to that topic and check whether
it is received to you subscription URL.

There are two possible ways that can be used to send input events using this axis2 service.
Following class can be used to send events to the axis service;
To run this class it is required to have Axis2 and Axiom binary distribution's library files.
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.ServiceClient;
import javax.xml.stream.XMLStreamException;
public class SampleClient {
public static void main(String[] args) {
ServiceClient serviceClient = null;
try {
serviceClient = new ServiceClient();
serviceClient.setTargetEPR(new EndpointReference("http://localhost:9763/services/localBrokerService/AllStockQuotes"));
} catch (AxisFault axisFault) {
axisFault.printStackTrace();
}
if (serviceClient != null) {
String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
" <quotedata:StockQuoteEvent>\n" +
" <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
" <quotedata:LastTradeAmount>99.55</quotedata:LastTradeAmount>\n" +
" <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
" <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
" <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
" <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
" <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
" <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
" <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
" <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
" <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
" <quotedata:PE>10.88</quotedata:PE>\n" +
" <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
" <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
" </quotedata:StockQuoteEvent>\n" +
" </quotedata:AllStockQuoteStream>";
String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
" <quotedata:StockQuoteEvent>\n" +
" <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
" <quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>\n" +
" <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
" <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
" <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
" <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
" <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
" <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
" <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
" <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
" <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
" <quotedata:PE>10.88</quotedata:PE>\n" +
" <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
" <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
" </quotedata:StockQuoteEvent>\n" +
" </quotedata:AllStockQuoteStream>";
String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" +
" <quotedata:StockQuoteEvent>\n" +
" <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" +
" <quotedata:LastTradeAmount>99.98</quotedata:LastTradeAmount>\n" +
" <quotedata:StockChange>0.05</quotedata:StockChange>\n" +
" <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" +
" <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" +
" <quotedata:DayLow>25.01</quotedata:DayLow>\n" +
" <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" +
" <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" +
" <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" +
" <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" +
" <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" +
" <quotedata:PE>10.88</quotedata:PE>\n" +
" <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" +
" <quotedata:QuoteError>false</quotedata:QuoteError>\n" +
" </quotedata:StockQuoteEvent>\n" +
" </quotedata:AllStockQuoteStream>";
OMElement omElement1 = null;
OMElement omElement2 = null;
OMElement omElement3 = null;
try {
omElement1 = AXIOMUtil.stringToOM(xmlElement1);
omElement2 = AXIOMUtil.stringToOM(xmlElement2);
omElement3 = AXIOMUtil.stringToOM(xmlElement3);
serviceClient.fireAndForget(omElement1);
serviceClient.fireAndForget(omElement2);
serviceClient.fireAndForget(omElement3);
} catch (XMLStreamException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (AxisFault axisFault) {
axisFault.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
}
When you run this class, if you have subscribed correctly to the output topic of the query, you will be able to see the filtered events in the console as bellow.
<xmlmapping><quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
<quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>
<quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
</quotedata:StockQuoteDataEvent></xmlmapping>
Invoking the deployed bucket via Publish feature can be tested as bellow
In order to send a message to a topic via Publish feature , you need to create a topic and a subscription
Step 01: Create a Topic
Here you can provide the topic as : AllStockQuotesTopic
Step 02: Create subscription to the created topic
Here we are using this topic to send messages to the Input topic. So the event sink URL in this case is the URL of the input topic.
Event Sink URL : http://localhost:9763/services/localBrokerService/AllStockQuotes
Subscription Mode : Topic Only
Click on the button 'Subscribe' and it will create the subscription and direct you to the "topic Browser" page.
Step 03: Publishing message to the topic
Sample Inputs : Use following xml elements ONE by ONE and invoke the service,
<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
<quotedata:StockQuoteEvent>
<quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
<quotedata:LastTradeAmount>99.55</quotedata:LastTradeAmount>
<quotedata:StockChange>0.05</quotedata:StockChange>
<quotedata:OpenAmount>25.05</quotedata:OpenAmount>
<quotedata:DayHigh>25.46</quotedata:DayHigh>
<quotedata:DayLow>25.01</quotedata:DayLow>
<quotedata:StockVolume>20452658</quotedata:StockVolume>
<quotedata:PrevCls>25.31</quotedata:PrevCls>
<quotedata:ChangePercent>0.20</quotedata:ChangePercent>
<quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
<quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
<quotedata:PE>10.88</quotedata:PE>
<quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
<quotedata:QuoteError>false</quotedata:QuoteError>
</quotedata:StockQuoteEvent>
</quotedata:AllStockQuoteStream>
<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
<quotedata:StockQuoteEvent>
<quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
<quotedata:LastTradeAmount>101.36</quotedata:LastTradeAmount>
<quotedata:StockChange>0.05</quotedata:StockChange>
<quotedata:OpenAmount>25.05</quotedata:OpenAmount>
<quotedata:DayHigh>25.46</quotedata:DayHigh>
<quotedata:DayLow>25.01</quotedata:DayLow>
<quotedata:StockVolume>20452658</quotedata:StockVolume>
<quotedata:PrevCls>25.31</quotedata:PrevCls>
<quotedata:ChangePercent>0.20</quotedata:ChangePercent>
<quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
<quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
<quotedata:PE>10.88</quotedata:PE>
<quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
<quotedata:QuoteError>false</quotedata:QuoteError>
</quotedata:StockQuoteEvent>
</quotedata:AllStockQuoteStream>
<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
<quotedata:StockQuoteEvent>
<quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
<quotedata:LastTradeAmount>99.98</quotedata:LastTradeAmount>
<quotedata:StockChange>0.05</quotedata:StockChange>
<quotedata:OpenAmount>25.05</quotedata:OpenAmount>
<quotedata:DayHigh>25.46</quotedata:DayHigh>
<quotedata:DayLow>25.01</quotedata:DayLow>
<quotedata:StockVolume>20452658</quotedata:StockVolume>
<quotedata:PrevCls>25.31</quotedata:PrevCls>
<quotedata:ChangePercent>0.20</quotedata:ChangePercent>
<quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
<quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
<quotedata:PE>10.88</quotedata:PE>
<quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
<quotedata:QuoteError>false</quotedata:QuoteError>
</quotedata:StockQuoteEvent>
</quotedata:AllStockQuoteStream>

You will be able to get the same output in the console as in previous test method
EsperTM is a registered trademark of EsperTech.