Library

[Article] Achieve Scalability and Performance With Partitioning In WSO2 CEP

  • By Sachini Jayasekara
  • 9 Jan, 2015

WSO2 CEP analyzes events and acts on them in real time. The backend runtime engine of WSO2 CEP is Siddhi. Financial analysis, fraud detection and monitoring are some use cases where CEP has been used. In such scenarios, performance and scalability are key aspects when dealing with high volumes and/or high rate of events. Partitioning is one technique in achieving scalability as well as performance. This article explains how partitioning is being used in WSO2 Siddhi and how partitions can help analyze events to extract important data.

Table of contents

  1. Introduction
  2. Partition architecture
  3. Implementation differences and improvements with respect to previous versions
  4. Sample
  5. Conclusion

Introduction

Siddhi is the backend runtime engine of WSO2 CEP. At a very high level, Siddhi receives events, processes them based on the queries defined, and delivers output events via callbacks. While Siddhi queries identify events that match a given condition or pattern, Siddhi partitions divide events into several groups based on some condition before evaluating a query. This results in parallel query evaluation for each partition separately. Partitions are important when the meaningful information can only be obtained by partitioning events based on a particular key rather than processing all the events as a whole. For instance, patterns in stock prices are meaningful when patterns are matched over stock quotes of the same symbol.

In Siddhi, partition is defined as a logical container that processes a subset of a defined query based on a pre-defined rule of separation. Partitioning is important due to several reasons:

  1. It allows parallel processing, resulting in increased performance and scalability
  2. For some scenarios, meaningful results can only be generated after partitioning
  3. Makes queries easier to design

Partition types

Siddhi facilitates two types of partition definitions; partitions based on variable or partitions based on range.

  1. Variable-based partition: Partitions are broken up according to a predefined variable
  2. Range-based partition: Partitions are broken up according to the range of a specified variable. The partition key for a particular range must be provided.

For a stream named StockStream (symbol string, price float, volume int) variable partition can be defined as,

partition with (symbol of StockStream)

Range partition can be defined as

partition with (volume = 10 as 'LARGE' of StockStream)

As shown below, queries can be added separated by semicolons in between begin and end token after the partition definition,

partition with (symbol of StockStream) ← partition definition

begin

from StockStream ← queries

select symbol,price

insert into OutputStockStream ;

end;

Partition architecture

When a query defined inside a partition is given to Siddhi, it will create a query runtime of that particular query and keep it as a meta query runtime model. When different partition keys are received, a new query runtime for each key is generated by looking at the meta query runtime. If a partition has several queries then all the meta query runtime models are kept as a list.

If we take partition architecture at a high level, when an event comes it first goes to a partition stream receiver; thereafter, the partition stream receiver generates the partition key and duplicates all the meta query runtimes and creates a partition instance runtime containing created query runtimes. The process of duplicating meta query runtimes and creation of partition instance runtime only happens if a partition instance runtime is not already generated for the particular partition key; otherwise the existing partition instance runtime for the partition key is used for processing.

The query runtime duplication process does not construct query runtimes from scratch, rather it simply takes the meta query runtime as a reference and simply clones all the components and generates the query runtime with a reduced amount of work in minimal time.

Figure 1

As shown in Figure 1, partition runtime contains a list of partition instance runtimes for each partition key. Partition instance runtime consists of all the query runtimes of the queries defined for the partition. These query runtimes are the ones that were created referring to meta model.

There can also be scenarios where a partition contains more than one query. Below is one such case.

partition with (symbol of cseEventStream)

begin

from cseEventStream

select symbol, avg(price) as avgPrice,volume

insert into OutStockStream;

from cseEventStream

select symbol, sum(price) as sumPrice,volume

insert into SumOutStockStream;

end;

If a partition consists of several queries then each query will get duplicated for each partition key as shown in Figure 2.

Figure 2

Further analysis

Before looking into partitions let’s look at how a Siddhi query works. Siddhi query is a logical construct that derives new streams by combining existing streams. Query contains some input streams, processors such as filter processor to modify these input streams, and an output stream to which it publishes its output events.

Figure 3

In Siddhi, incoming events of a particular event stream flows to its respective 'Stream Junction'. Stream Junction is responsible to send the events to all the subscribers that are registered to that event stream. Stream Junction has a disruptor (https://github.com/LMAX-Exchange/disruptor/wiki/Introduction) to which events are published and subscribers act as the consumers of the disruptor. Disruptor facilitates parallel consumption of events by each subscriber which results in increased performance.

There are two types of subscribers in Siddhi,

Stream Callback - which is used to notify an event occurrence of a particular stream

Stream Receivers - which is responsible for receiving events and processing them based on a particular query.

Once the processing is done the output is sent to its registered Query Callback and its output stream’s Stream Junction, where the event will be fed to all the queries and Stream Callbacks registered to that event stream.

Inner streams

Figure 4

Inner stream is another concept used in Siddhi partitions. It is a sub type of an event stream. Inner streams reside inside a partition and they are only visible to queries in the respective partition and other queries outside the partition have no knowledge of the inner streams. Inner stream allows carrying out same operation to results of each partition key separately as shown above. If an inner stream is used as the output stream of a query inside a partition (i.e. query1), then events generated by each partition key will be directed to a different stream with the same stream definition.

As shown in Figure 4, for query1 stream A will get partitioned based on a partition key and operations are carried out separately for each partition key. Using an inner stream we can further carry out operations to results of each partition key individually. Queries outside the partition will not have any knowledge about stream B as it is an inner stream.

Figure 5

As shown in Figure 5, stream junctions send events to its subscribers; in an inner stream case there is a set of stream junctions that are local to the particular partition.

In Figure 5, partition is done based on the value of the symbol (an attribute of the event stream A) and there are two partition instances one for symbol A and another for symbol B. Output of the query1 goes to an inner stream. When an event comes to the partition stream receiver, it generates the partition key can send it to the respective local stream junction based on the key. Once output of the query1 comes, it should go to an inner stream, in that case output events of query1 will eventually go to the local stream junction of the particular partition key of inner stream B. There will be separate stream junction for each symbol which allows execution of events of each partition key individually.

In the Siddhi Query syntax we distinguish an inner-stream using # sign. Inner stream B can be referred as #B.

E.g.

define stream cseEventStream (symbol string, price float,volume int);

partition with (symbol of cseEventStream)

begin

from cseEventStream

select symbol,avg(price) as avgPrice

insert into #StockStream ;

from #StockStream

select symbol, max(avgPrice) as price

insert into OutStockStream ;

end

Implementation differences and improvements from the previous versions

Inner stream support

In previous versions, partition query execution is carried out for each partition key separately and results are directed to a single stream. Once the results of the partition query is sent to the output stream there is no way to identify from which partition key the event was generated and carry out operations to events generated by each partition key separately. Inner streams mentioned in the architecture section overcome this matter. Inner stream allows carrying out same operation parallely to results of each partition key individually.

Disruptors for parallel execution of queries

Disruptors are being used in each stream junction, which allows parallel execution of queries. In previous versions, if there are several queries, each will run one after another, which consumes relatively more time. Disruptors reduce time taken to generate output events and facilitates users to take proactive actions in real time based on analyzed facts.

Cloning

As explained in the architecture section, for each partition key, required query runtimes, stream junctions, etc. is created. In previous versions, when a new partition key arrives, required processors and selectors for each query are generated from scratch. Hence, for each partition key, the same logic is repeatedly executed, which leads to more time consumption. To overcome this, with the latest Siddhi engine, it simply duplicates all the required processors by simply cloning the processors of original query doing minimal work without creating it from scratch. In partitioning scenarios, when an event comes to a partitioned query, the amount of work performed to duplicate all the processors required for each partition key to execute the event is reduced by this approach.

Samples

Simple partition sample

  

public static void main(String[] args) throws InterruptedException {

        SiddhiManager siddhiManager = new SiddhiManager();

        String executionPlan = "define stream cseEventStream (symbol string, price float,volume int);"  +
                              	              "partition with (symbol of cseEventStream) "+
                                             "begin " +
                                             "from cseEventStream select symbol,avg(price) as avgPrice,volume insert into OutStockStream;" +
                                             "end ";

        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

        StreamCallback streamCallback = new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
            }
        };
        executionPlanRuntime.addCallback("OutStockStream", streamCallback);

        InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
        executionPlanRuntime.start();
        inputHandler.send(new Object[]{"IBM", 75f, 100});
        inputHandler.send(new Object[]{"WSO2", 705f, 100});
        inputHandler.send(new Object[]{"IBM", 35f, 100});
        inputHandler.send(new Object[]{"ORACLE", 50.0f, 100});
    }


Here, events of cseEventStream will be divided based on the symbol and will output symbol and average price of events with the particular symbol.

Input:

IBM, 75f, 100]

[WSO2, 705f, 100]

[IBM, 35f, 100]

[ORACLE, 50.0f, 100]

Output:

[IBM, 75.0, 100]

[WSO2, 705.0, 100]

[IBM, 55.0, 100]

[ORACLE, 50.0, 100]

Partition sample with an inner stream

  
public static void main(String[] args) throws InterruptedException {

        SiddhiManager siddhiManager = new SiddhiManager();

        String executionPlan = "define stream cseEventStream (symbol string, price float,volume int); "  +
                               "partition with (symbol of cseEventStream) "+
                               "begin " +
                                    "from cseEventStream select symbol,avg(price) as avgPrice insert into #StockStream  ;" +
                                    "from #StockStream select symbol, max(avgPrice) as price insert into OutStockStream  ;" +
                               "end ";

        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

        StreamCallback streamCallback = new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                EventPrinter.print(events);
            }
        };
        executionPlanRuntime.addCallback("OutStockStream", streamCallback);

        InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream");
        executionPlanRuntime.start();
        inputHandler.send(new Object[]{"IBM", 75f, 100});
        inputHandler.send(new Object[]{"WSO2", 705f, 100});
        inputHandler.send(new Object[]{"IBM", 35f, 100});
        inputHandler.send(new Object[]{"ORACLE", 50.0f, 100});
    }


Here events of cseEventStream will be divided based on the symbol and will output symbol and average price of events with the particular symbol. Then the second query will give the maximum average price for each symbol.

Conclusion

This article explains how partitioning process works in Siddh 3.0.0 mainly focusing on partitioning architecture, samples and syntax to be used when using partitions. New concepts introduced in Siddhi 3.0.0, such as inner streams, extend the partition functionality, thereby allowing users to perform complex and thorough analysis of events.

About Author

  • Sachini Jayasekara
  • Software Engineer
  • WSO2