Understanding How Siddhi Powers WSO2 Complex Event Processor 2.x
By Sriskandarajah Suhothayan
- 10 Jun, 2013
WSO2 Complex Event Processor (WSO2 CEP) is an enterprise-grade server that integrates with various systems to collect, analyze, and notify meaningful patterns real time. These CEP systems are widely used in areas such as monitoring, financial analysis, and fraud detection. The core back-end runtime engine behind the WSO2 CEP 2.x server is WSO2 Siddhi. This article will take you through the key functionalities of Siddhi and its architecture; the target audience for this article is developers. We hope that the article will help developers to better understand the code, and will help them to fix bugs and also improve Siddhi.
Table of contents
- High-level architecture
- Stream definition
- Siddhi queries
- Basic Siddhi queries
- Join query
- Pattern and sequence query
At a very high level, Siddhi receives incoming events in 'Event Streams' via input handlers, processes them, and notifies the output via callbacks. Here, we use the term Event Streams when the events in a particular Event Stream have a definite schema and when they are logically ordered in time.
High-level architecture of Siddhi
Each event in Siddhi has a Stream ID representing the Event Stream it belongs to, timestamp representing the event creation time, and an Object array containing the data attributes of the events.
To process an event stream in Siddhi, we have to first define that stream;
define stream StockQuartStream (symbol string, price float, volume int);
When defining streams, we specify its name and its attributes, and each of the attributes is defined as pairs of their name and type in order.
Note: In WSO2 CEP, users are not given the option to explicitly define an event stream, where as in WSO2 CEP 2.x, defining the stream is implicitly done using the input/output mapping of the CEP bucket.
When an event stream is defined, internally, Siddhi creates an input handler, which we can use to send events into the system on the defined event stream. At the same time, we can also add callbacks to event streams, which will receive notifications when events are produced on those event streams.
Siddhi supports the following complex event processing queries through its SQL-like query language.
Siddhi Event Query Language has the following structure:
from <incoming stream>[<incoming stream filter>]#<window on the stream> insert into <outgoing stream> <outgoing stream attributes>
Here, when events arrive from the incoming event streams, they are filtered and only the success events of the filter will flow to the window. These windows, based on their configuration, sustain some of the incoming events for a certain period of time for further processing, like aggregation calculations. Finally, all these events will be projected on the outgoing event streams based on the defined outgoing stream attributes.
The following is a code snippet demonstrating a simple filter query in Siddhi.
from StockQuartStream[symbol == 'FB'] insert into FacebookStockStream price, volume ;
When a query is defined, it will implicitly define its output stream. Hence, in this case, the above query will implicitly define FacebookStockStream to have the price of type float and volume of type int.
If we want to calculate how many Facebook stocks are traded in the last minute, we can improve the above query by adding a time window.
from StockQuartStream[symbol == 'FB']#window.time(1 min) insert into FacebookCountStockStream price, volume, count(price) stockCount;
Similar to windows in WSO2 CEP 2.1.0, Siddhi also supports transforming streams using #transform(..). For more information on transform, follow the complete reference on the Siddhi Event Query Language here
The architecture of a basic Siddhi query (having Filter, Transform, and Window) is as follows:
Siddhi basic query
Here, the events flow from the 'Input Handler' of the incoming 'Event Stream' to its respective 'Stream Junction'. The Stream Junction is responsible to send the events to all components that are registered to that Event Stream. In Siddhi, we can find two main types of Stream Subscribers; Stream Callback - which is used to notify an event occurrence on a particular stream, and Query Handler Processors - which are responsible for filtering and transforming the events for further processing. Only the event that passes the filter conditions will be outputted from the Query Handler Processor, which will indeed be fed into the Window processor where the events will be stored for time, length or uniqueness-based, or other custom processing. The events are then fed into the Query Projector to perform event attribute level processing such as avg(price), group by and having. Finally, the output of Query Processor will be sent to its registered Query Callback and its output stream's Stream Junction where the event will be fed to all the Queries and Stream Callbacks registered to that Event Stream.
These output streams are implicitly defined by inferring the query, and hence, we don't need to define them explicitly.
In WSO2 CEP 2.x, we use Query Callbacks to notify the output events of the users.
Siddhi condition tree
If we look at the conditions in Siddhi, they have the above tree structure, and their execution is based on the Depth First Search Algorithm. To achieve high performance, currently, Siddhi depends on the user to formulate the lease success case in the leftmost side of the condition, thereby increasing the chances of early false detection.
Siddhi time window
The temporal event processing aspect is achieved through windows. In Siddhi, when an event comes into a 'Window Processor', it will create an appropriate expired event corresponding to the incoming event with the expiring time stamp and store that event in the window. The Window Processor will also forward the incoming event to the 'Query Projector' for further processing. When it comes to the time window, when an expired event is created, Siddhi adds that to the window, and it will also schedule the executor to remove the events from the window when they expire. Here, the executor will continuously monitor and remove the events from the window based on the first-in, first-out approach and also pass the removed expired events to the Query Projector.
The creation of expired events is vital in Siddhi because Siddhi relies on them to calculate the Aggregations at the Query Projector. In Query Projector, the arrived events will be transformed and ordered to create the output events, according to the output Event Stream Definition. During the projection process, Siddhi also carries out Aggregations based on the Event types (in-events and expired-events) where the in-events increase the aggregation and vice-versa with expired-events.
Siddhi supports joining two streams; the following query demonstrates how two queries can be joined based on a condition.
from StockQuotesStream#window.time(5 min) as sqs join HighFrequentTweetStream#window.time(15 min) as hfts on sqs.symbol == hfts.company insert into InterestingStockQuotesStream sqs.symbol as company, sqs.price as lastTreadedPrice, hfts.words as wordsTweeted
Here, events from StockQuotesStream and HighFrequentTweetStream will be joined only if the events of StockQuotesStream have a symbol name that is equal to the HighFrequentTweetStream events company name.
The architecture of Siddhi join query is as follows:
Siddhi join query
A join query always has two Handler processors, one for each input stream it joins. Here, when an event from one stream reaches the In-Stream Join Processor, it is matched against all the available events of the other stream's Window Processor. When a match is found, those matched events are then sent to the Query Projector to create the output in-events; at the same time, the original event will be added to the Window Processor and it will remain there until it expires. Similarly, when an event expires from its Window Processor, it is matched against all the available events of the other stream's Window Processor; when a match is found, those matched events are sent to the Query Projector to create the output expired-events.
Note: Inspite of the optimizations, a join query is quite expensive when it comes to performance, and this is because the Window Processor will be locked during the matching process to avoid race conditions and to achieve accuracy in joining process; therefore, users should avoid matching huge windows in high volume streams. Based on the user scenario, using appropriate window sizes (by time or length) or using within keywords will help to achieve maximum performance.
One of the famous queries in CEP is pattern and sequence matching. These are state machine-based implementations. A sample pattern query is as follows:
from every (a1 = purchase[price > 10] ) -> a2 = purchase [price >10000 and a1.cardNo==a2.cardNo] within 1 day insert into potentialFraud a1.cardNo as cardNo, a2.price as price, a2.place as place;
Here, the potentialFraud event will be fired only if two purchases of the same card are made within one day and if the first purchase is less than $10 and the second one is greater than $10,000.
Here, there can be other purchase events of different cards in between the first and the second events; however, when it comes to sequence query, the events that are matched need to arrive consecutively.
The architecture of pattern and sequence query is as follows:
Siddhi pattern and sequence query
Pattern and sequence queries can have many Handler Processors; here, they will have a Handler Processor for each incoming event stream. After events are received by the Handler Processor, it passes them to the Inner Handler Processors; these Inner Handler Processors are responsible for processing the states in pattern and sequence queries. Here, the Inner Handler Processors contain all the events that are partially matched up to its state level, and when a new event arrives it tries to match whether it satisfies its Filter condition along with the partially matched events. If there is a match, it passes the corresponding previously matched events and the current event to the next state (Inner Handler Processor).
For more information on how pattern and sequence query works refer to the Siddhi Language Docs.
This article focuses on describing the architecture of Siddhi and rationale for the architectural decisions. This article also explains the key features and gives some understanding on the syntax of Siddhi that was used in WSO2 CEP 2.x. This is possibly a great starting point for new developers to understand Siddhi and to start contributing towards it.