Bringing Complex Event Processing to SOA

Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.
  • By Amila Suriarachchi
  • 15 Aug, 2011

Introduction

Software systems generate events according the various actions performed by the users. These events can contain data related to purchase order transactions, user login details, stock quote price changes etc. Out of these events some events, patterns of event occurring or combination of some event patterns can be more useful and hence required to take special actions. Hence this process of deriving other events by analyzing the real time systems called complex event processing (CEP). For an instance a stock quote data may send the data about all the companies with the latest price periodically. But an application may need the stocks of the companies which increased by 2% compared to average price of the last hour. For this scenario we can use complex event processing to relate the historical data with the current event and derive new events. Not only one event stream but we can also relate many event streams and can derive new events as well.

Complex event processing engines such as Drools Fusion[1] and Esper™[2] provides the CEP functionality as a library. These CEP engines has a concept of stream to push the events but those events are java objects with the relevant properties. However in a software system design with service oriented architecture (SOA) contains the messages in xml format and the CEP servers should be able to subscribe to such event sources and publish the generated events to other end points for further processing. WSO2 Complex Event Processing server can be used to create a CEP processing engine with all of the above capabilities. It by default shift with the Drools fusion[1] CEP engine and Esper™[2] is available as a P2 carbon feature.

WSO2 CEP server introduces a concept of CEP bucket to make a better abstraction model for such an event processing. Hence first this article gives a description about this model and uses a sample scenario to describe it further. Since WSO2 CEP server is based on the WSO2 Carbon Platform, this functionality is implemented as a carbon component. Therefore final part of article describes the architecture of the CEP carbon component and related broker component to explain the extensibility and its flexibility to plug different CEP engines and many types of event brokers.

Applies To

WSO2 CEP 1.0.0

Contents

CEP abstract model

CEP Abstract Model

WSO2 CEP Abstract model is based on the concept of CEP Bucket which has a set of Inputs to receive events and set of queries to process the events.

An Input maps to an event stream. So within an input it defines the Broker and topic to be subscribed to receive events. Then it needs to send this events to the underlying CEP engine which can either be Fusion[1] or Esper™[2]. When sending the events to the CEP engine, CEP component sends the event as a java HashMap. Therefore the Input has a Mappings section to define a set of properties of which name become the key, and xpath expressions to extract the value from the xml document.

CEP Bucket can have many Queries to process the input streams. A Query may or may not produce an Output. Then again CEP component assume this output is in the form of a java Hash Map and users can define either Element Mapping or an XML Mapping to convert this Hash Map to an xml message. Output can also define the broker and the topic to publish the generated new events.

Sample Scenario

Lets take a scenario where a stock broker wants to get a special action on the stock prices of which value is varied 2% from the average value for that company over last 10 minits[3]. He can subscribe to a WS-Event broker end point to receive stock quote events and publish back the generated events to a new topic in another broker.

Esper™ Query

            select symbol, avg(price) , price from allStockQuotes.win:time(10 min)
            group by symbol having ((price > (avg(price)*1.02)) or  ((avg(price)*0.98)>price ))

Esper™ query language is similar to SQL and it supports sliding windows so that users can write queries with the past time windows. In this example allStockQuotes is an event stream of which there are objects with properties price and symbol.

Broker Configuration

With WSO2 CEP server, users can define their external brokers in a different broker manager component and use those broker names in the bucket configuration. Currently it supports three broker types called WS-Event, local and JMS and for this scenario we can define an WS-Event broker as follows.

  <brokerConfiguraton name="wsEventBroker" type="ws-event">
	 <property name="uri">https://localhost:9444/services/EventBrokerService</property>
	 <property name="username">admin</property>
	 <property name="password">admin</property>
  </brokerConfiguraton>

CEP Bucket Configuration

<bucket name="StockQuoteAnalyzer" engineProvider="EsperCEPRuntime">
    <description>
        This bucket analyzes stock quotes and trigger an event if 
        the last traded amount vary by 2 percent with regards to the 
        average traded price within past 2 minutes.
    </description>
    <input topic="AllStockQuotes" brokerName="wsEventBroker">
        <mapping stream="allStockQuotes">
            <xpathDefinition prefix="quotedata" namespace="http://ws.cdyne.com/"/>
            <property name="symbol" xpath="//quotedata:StockQuoteEvent/quotedata:StockSymbol" type="java.lang.String"/>
            <property name="price" xpath="//quotedata:StockQuoteEvent/quotedata:LastTradeAmount" type="java.lang.Double"/>
        </mapping>
    </input>
    <query name="FastMovingStocksDetector">
        <expression type="inline">
            select symbol,avg(price),price from allStockQuotes.win:time(1 min)
            group by symbol having ((price > (avg(price)*1.02)) or
            ((avg(price)*0.98)>price ))
        </expression>
        <output topic="FastMovingStockQuotes" brokerName="wsEventBroker">
            <xmlmapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
                    <quotedata:AvgLastTradeAmount>{avg(price)}
                    </quotedata:AvgLastTradeAmount>
                    <quotedata:LastTradeAmount>{price}</quotedata:LastTradeAmount>
                </quotedata:StockQuoteDataEvent>
            </xmlmapping>
        </output>
    </query>
</bucket>

This bucket is named as 'StockQuoteAnalyzer' and it uses the Esper™[2] as the back end runtime engine. First it subscribes to the 'AllStockQuotes' topic of the broker configuration named as 'wsEventBroker'. For this sample it expect to receive a message given below.

<quotedata:AllStockQuoteStream xmlns:quotedata="http://ws.cdyne.com/">
    <quotedata:StockQuoteEvent>
        <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
        <quotedata:LastTradeAmount>99.55</quotedata:LastTradeAmount>
        <quotedata:StockChange>0.05</quotedata:StockChange>
        <quotedata:OpenAmount>25.05</quotedata:OpenAmount>
        <quotedata:DayHigh>25.46</quotedata:DayHigh>
        <quotedata:DayLow>25.01</quotedata:DayLow>
        <quotedata:StockVolume>20452658</quotedata:StockVolume>
        <quotedata:PrevCls>25.31</quotedata:PrevCls>
        <quotedata:ChangePercent>0.20</quotedata:ChangePercent>
        <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>
        <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>
        <quotedata:PE>10.88</quotedata:PE>
        <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>
        <quotedata:QuoteError>false</quotedata:QuoteError>
    </quotedata:StockQuoteEvent>
</quotedata:AllStockQuoteStream>

The mappings section defines the stream name which is used in the CEP query to process the events, and the property names and the xpath expressions to get the values from the xml. Property names are used with the query expressions to process the messages.

The Query is the most important part of the bucket. It is written in a underlying CEP engine specific manner and this example uses an Esper™ query. If there is an output generated from the query then it is published to ' FastMovingStockQuotes' topic of the broker configuration with the name ' wsEventBroker'. CEP component receives the event as a HashMap. Then the given xml message in the xmlmappings is used to produce the xml message out put.

WSO2 CEP architecture

CEP Architecture

WSO2 CEP server mainly consists of three components called cep, broker and broker manager. CEP component manages the CEP buckets, process the messages and invoke the cep engine while broker component subscribes to and publish messages to different types of brokers. Broker manager component is used to keep the different broker configurations which is used by the broker component.

CEP component architecture

The Scenario

As in any other feature in the WSO2 Carbon Platform CEP is also implement as an OSGI service. The implementation service is called the CEPService. CEP service contains a set of CEP buckets which is defined either using the cep_config.xml or administrative user interface. As shown in the diagram users can use the administrative components to create CEP buckets. Administrative console which is written using JSP invokes the CEPAdmin service to store the buckets to back end. CEPAdmin service invokes the OSGI interface (CEPServiceInterface) and CEPService creates the bucket and saves it to the registry using CEPRegistryInvoker. Similarly CEPServiceBuilder is used to create the buckets from the cep-config.xml.

When creating a CEPBucket, first it subscribes to the given broker using the BrokerService OSGI interface. The BrokerService OSGI interface let users to register a callback to receive the messages when subscribes. Therefore CEPBucket registers a TopicEventListener to receive the events. These received events are in xml format and the OMElement Processor is used to create Hash Map objects from that using mappings defined at the input and insert the event to underlying CEPBackEndRuntime through the CEPBucket.

CEP Component abstracts the CEP engine functionality using CEPBackendRuntime interface. Therefore any cep engine can be plugged to cep component by writing a implementation which calls the real cep engine. Currently it supports drools fusion[1] and Esper™[2]. At the CEP bucket creation time CEP bucket registers the cep queries with the CEPBackendRuntime service. When registering the query it registers a CEPEventListener to receive the generated events. CEPEventLister receives the events as HashMaps and convert it to an xml message using CEPEventProcessor. Finally it will publish the message to the topic of the broker configuration given.

Broker Component architecture

The Scenario

The broker component is used to manage the different types of brokers. The BrokerService interface has two methods to publish and subscribe. Both methods take the broker configuration as a parameter and subscribes or publish the messages according to the type. Currently it supports local, WS-Event and JMS as types but can be extended to any broker by implementing a broker type.

Conclusion

Complex Event Processing is a key concept in the event driven applications where real time analysis is required. In order to cater these requirements there are complex event processing engines such as Drools fusion[1] and Esper™[2] has been developed. But most of these CEP engines interact with the system using java objects. However in a SOA environment the events are in xml format and it is required to subscribes to brokers and receive events and publish them back. WSO2 CEP Server provides this functionality by introducing an abstract model to process these xml events in an SOA environment. Therefore this article first elaborate that abstract model and describes it with an example. Then it explain the underlying architecture of the cep and broker components to show the extensibility of those components to different cep engine runtime and the brokers.

References

[1]http://www.jboss.org/drools/drools-fusion

[2] http://docs.codehaus.org/display/ESPER/Home

[3]https://wso2.com/project/cep/1.0.0/docs/samples/stock_quote_esper_ws_event.html

Author

Amila Suriarachchi, Software Architect, WSO2 Inc.

Esper™ is a registered trademark of Esper™Tech.

About Author

  • Amila Suriarachchi
  • Architect, Member, Management Committee - Data Technologies
  • WSO2 Inc.