[Article] Distributed Scaling of WSO2 Complex Event Processor
By WSO2 Team
- 7 Dec, 2015
By Sajith Ravindra and Miyuru Dayarathna
System scalability is of utmost importance when it comes to online big data analytics. Of the three Vs (volume, velocity, and variety) present in any big data analytics scenario, volume and velocity properties significantly depend on the data analytics system’s performance aspects. In this article we describe a scalability experiment conducted on the WSO2 Complex Event Processor 4.0 (WSO2 CEP) in a distributed computer cluster.
First, we provide an introduction to WSO2 CEP and describe how its server is scaled in a distributed cluster by integrating it with Apache Storm. Next, we provide the details of the benchmark used in this study and the benchmarking process. Finally, discuss the experiment results and the scaling experience before we conclude the article.
Overview of WSO2 CEP in Apache Storm
WSO2 CEP is a 100% open source, lightweight, easy-to-use, complex event processor. It is capable of listening to incoming data streams in various protocols and generating new events based on the processing logic defined by the user as a Siddhi query (in SiddhiQL). The output can also be given through various transport protocols.
With the latest release of WSO2 CEP (version 4.0), it is capable of processing events in a distributed manner by performing event processing inside an Apache Storm1 cluster by running the Siddhi engine inside Storm. The Siddhi queries specified in the WSO2 CEP server’s management console gets compiled into a storm topology which is comprised of Siddhi bolts, event receiver spouts and event publisher bolts. When the topology is deployed into the storm cluster, the components in the storm topology and the WSO2 CEP servers connect with each other with the aid of the WSO2 CEP Manager. Event streams from external sources are received by WSO2 CEP and then passed onto the storm cluster via ‘receiving spouts’ for processing by ‘Siddhi bolts’ and then the processed events are passed onto WSO2 CEP through ‘publishing bolts’ to publish them back to external event sinks. The system architecture diagram shown in figure 1 depicts how a job gets deployed in the Storm cluster.
Figure 1: System architecture of WSO2 CEP on Apache Storm
We used an application benchmark called EmailProcessor during the system scaling experiments. The dataflow graph of the benchmark is shown in figure 2.
Figure 2: Data flow of EmailProcessor benchmark
The benchmark was designed using the canonical Enron email data set2. The data set consisted of 517,417 emails with a mean body size of 1.8KB, the largest being 1.92MB. The dataset we used had undergone an offline cleaning and staging phase where all the emails were serialized and stored within a single file using Apache Avro. The data injector reads emails from the Avro file, deserialized them and sends them to Q1 for filtering. Q1 drops emails that did not originate from an address ending with @enron.com. Furthermore, it removes all email addresses that do not end with @enron.com from To, CC, and BCC fields. Q2 modifies each and every email by replacing the names of three individuals, Kenneth Lay, Jeffrey Skilling and Andrew Fastow, in the email body with Person1, Person2, and Person3 respectively. The Q3 operator collects metrics of each email such as number of characters, words, and paragraphs in the email body. The processed emails, which were compressed in Avro format and written to /dev/null, that are sent from Q3 to Q4 effectively get discarded. The correct benchmark implementation filters out 89,230 unwanted emails and outputs 428,187 emails.
We were motivated to use this benchmark for characterizing the performance of WSO2 CEP by several factors:
- It demonstrates a relatively simple real world example of event processing
- The data objects (i.e. emails) are relatively large compared to a simple microbenchmark, which can stress the event processing system significantly
- Since the application does not involve any stream joins or aggregations, it can be easily scaled by applying parallel pattern3
The benchmark was implemented using SiddhiQL; the query language used for defining CEP jobs in the WSO2 CEP server4. The SiddhiQL code of EmailProcessor is shown in listing 1.
WSO2 CEP server was configured to run in a distributed mode with Apache Storm. When started, the WSO2 CEP server compiles and deploys the Storm topology derived by the SiddhiQL based queries in the Apache Storm cluster and listens for the input events. We also started an event consumer application that waits for the output from the benchmark application. The emails are read from an Avro serialized file and injected to the query network through a data injector application. All the attributes except regexstr of an input event are populated from the email data set. The regexstr field is populated with “(.*)@enron.com” which is used by “filterQuery1” in listing 1 to filter emails that don’t originate from enron.com.
The implementation of the EmailProcessor benchmark utilizes some standard built-in functions of the Siddhi library. These include find() and replace_all() functions while some custom functions such as filter(), modify(), mostFrequentWord() and metrics() are also used since there were certain operations that are specific to this particular use case.
/* Enter a unique ExecutionPlan */ @Plan:name('EmailProcessor') /* Enter a unique description for ExecutionPlan */ -- @Plan:description('Execution plan of EmailProcessor') /* define streams/tables and write queries here ... */ @Import('inputEmailsStream:3.0.0') define stream inputEmailsStream (iij_timestamp long, fromAddress string, toAddresses string, ccAddresses string, bccAddresses string, subject string, body string, regexstr string); @Export('outputEmailStream:3.0.0') define stream outputEmailStream (iij_timestamp long, fromAddress string, toAdds string, ccAdds string, bccAdds string, updatedSubject string, bodyObfuscated string); @name('filterQuery1') @dist(parallel='4', execGroup='filter') from inputEmailsStream [ regex:find(regexstr, fromAddress) ==true ] select iij_timestamp, fromAddress, toAddresses, ccAddresses, bccAddresses, subject, body insert into filteredEmailStream1; @name('filterQuery2) @dist(parallel='4', execGroup='filter') from filteredEmailStream1 select iij_timestamp, fromAddress, emailProcessorBenchmark:filter(toAddresses) as toAdds, emailProcessorBenchmark:filter(ccAddresses) as ccAdds, emailProcessorBenchmark:filter(bccAddresses) as bccAdds, subject, body insert into filteredEmailStream2; @name('modifyQuery1') @dist(parallel='4', execGroup='modify') from filteredEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, emailProcessorBenchmark:modify(body) as bodyObfuscated1 insert into modifiedEmailStream1; @name('modifyQuery2_part1') @dist(parallel='4', execGroup='modify') from modifiedEmailStream1 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replace_all(bodyObfuscated1, 'Kenneth Lay', 'Person1') as bodyObfuscated2 insert into modifiedEmailStream2; @name('modifyQuery2_part2') @dist(parallel='4', execGroup='modify') from modifiedEmailStream2 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replace_all(bodyObfuscated2, 'Jeffrey Skilling', 'Person2') as bodyObfuscated3 insert into modifiedEmailStream3; @name('modifyQuery2_part3') @dist(parallel='4', execGroup='modify') from modifiedEmailStream3 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, subject, str:replace_all(bodyObfuscated3, 'Andrew Fastow', 'Person3') as bodyObfuscated4 insert into modifiedEmailStream4; @name('modifyQuery3') @dist(parallel='4', execGroup='modify') from modifiedEmailStream4 select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, emailProcessorBenchmark:mostFrequentWord(bodyObfuscated4, subject) as updatedSubject, bodyObfuscated4 as bodyObfuscated insert into modifiedEmailStream; @name('metricsQuery') @dist(parallel='4', execGroup='metrics') from modifiedEmailStream select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated, emailProcessorBenchmark:metrics(bodyObfuscated) as metrics insert into metricsEmailStream; @name('outputQuery') @dist(parallel='1', execGroup='output') from metricsEmailStream select iij_timestamp, fromAddress, toAdds, ccAdds, bccAdds, updatedSubject, bodyObfuscated insert into outputEmailStream;
Listing 1: SiddhiQL based implementation of EmailProcessor benchmark
We use two specialized (i.e. dummy) events at the beginning and at the end of the email event streams to facilitate the performance measurement and verification. The preamble event carries the time of the first event injection while the last event carries the number of events injected. The last event also acts as the signal for the termination of the event injection, which effectively enables calculating the throughput and latency statistics of the experiment.
It should be noted that the last event is sent 20 seconds after sending the last email object in order to make sure that all the email events have been processed by WSO2 CEP and Apache Storm environments before the last event is received. This was implemented because if it’s sent right after the last email, several emails may not be taken into account for performance statistics and calculation by the event consumer application. We made sure this delay isn’t introduced into the performance number calculation by taking the time difference between the timestamp value carried by the first event and the time the last processed email was received by the consumer. We also ensured that clock drifts, which could be present in the experiment cluster, do not act adversely on the experiment results by adjusting the time values based on NTP on each code. In these experiments we set the level of parallelism per Storm topology as four in all the query operators except the output operator.
The experiments were conducted on four compute nodes. Each node was configured with Intel Xeon E312xx (Sandy Bridge), 2700MHz, 4 cores (1 hardware thread per core), 8GB RAM. The sizes of L1(d/i) and L2 caches were 32KB and 4096KB respectively. The nodes were running Linux Ubuntu (kernel 3.13.0-36-generic) 64-bit version. We used Oracle JDK 1.7, WSO2 CEP server 4.0 and Apache Storm 0.9.4. Each WSO2 CEP server was configured with maximum 4GB JVM heap and each Storm worker was configured with maximum 2GB JVM heap.
The experiments were oriented towards evaluating the scalability of the WSO2 CEP distributed deployment. First, we ran the EmailProcessor benchmark on a non-distributed (i.e. standalone) WSO2 CEP server to measure baseline performance. Next, we followed a strong scaling approach where we measured the throughput of processing a fixed sized input data set with varying numbers of compute nodes. We first deployed one node with one CEP server and one Storm worker. Next, we deployed two nodes each with one CEP server and one Storm worker. We kept on adding up to four similar nodes and measured the throughput performance at each node combination. In the multi-node scenarios we distributed the input data set across the CEP using a load balancer.
As per the baseline performance we received 6556 events/second throughput from the non-distributed WSO2 CEP server setup. The results of the distributed WSO2 CEP experiment is shown in figure 3. However, the distributed single node deployment reported about 3700 events/second, which is a significant decrease in speed due to the introduction of overheads caused by the communication between Apache Storm cluster and WSO2 CEP. In the distributed version, since the processing happens within the Storm cluster, the events need to be routed from WSO2 CEP server to the storm cluster and the processed data needs to be routed back from Storm to the WSO2 CEP server. Also, the large sized events containing emails amplified the communication overhead furthermore. This has introduced such a performance drop in the most basic setup of the distributed experiment. However, as we keep adding the nodes the system scaled with better performance and we received about 11,050 events/second throughput with four nodes.
Figure 3: Scalability of EmailProcessor benchmark on WSO2 CEP distributed mode
From the experiments conducted using the EmailProcessor benchmark we observed the scalability of WSO2 CEP distributed implementation. The messages used in the EmailProcessor benchmark were significantly large, which made the CEP server busy with encoding and decoding messages and reading and writing sockets. Therefore, the input rate was not enough to keep the storm cluster busy. In order to increase the input rate to Storm we had to increase the number of CEP nodes.
Increasing the parallelism of storm components didn’t directly translate into increased performance. Instead, it degraded the performance in some cases due to the contention caused by having a large number of tasks per JVM. The optimal parallelism for components has to be decided with some experimenting to find out the bottlenecks and hotspots. Similarly, adding more storm workers per storm supervisor can also result in lesser performance due to the resource contention within the supervisor machine. It has to be decided by considering the available resource in the supervisor machine.
Better grouping of queries can also increase the performance. When queries are in the same execution group they will be executed in the same Siddhi bolt. Thus, it will decide the number of Siddhi bolts that need to be used for the topology. By having set of queries that are properly grouped, we can effectively increase the performance with better resource utilization.
According to our experience, complexity, nature of the problem, size of events, quality of the network infrastructure, latency/throughput requirement and hardware specification of machines are some important factors that should be considered when deciding the deployment of the cluster and the number of nodes required. However, it’s advisable to perform several rounds of testing while capacity planning.
Running WSO2 CEP in distributed mode is one of the important features added to it in the latest release (version 4.0.0). In distributed mode, the Siddhi engine is placed in a Storm cluster to run as bolts instead of running it inside WSO2 CEP server itself. Thus, it allows to spawn multiple instances of Siddhi engines to run across a Storm cluster resulting in a system that is highly horizontally scalable and capable of handling large volumes of events per time unit. We demonstrated the scalability of WSO2 CEP in distributed mode by using the EmailProcessor benchmark with varying numbers of compute nodes while keeping the dataset constant and measuring the output throughput for each setup. We observed almost linear scalability with the EmailProcessor benchmark with throughput performance of 11,050 events/second received for a four node setup.
-  https://storm.apache.org/
-  Klimt, B.(2004), Yang, Y., "Introducing the Enron Corpus." In the proceedings of the first Conference on Email and Anti-Spam (CEAS), 2 pages.
-  Ballard, C., et al. (2010), IBM Infosphere Streams: Harnessing Data in Motion. IBM (2010)
-  Sachini Jayasekara, Srinath Perera, Miyuru Dayarathna, and Sriskandarajah Suhothayan. 2015. Continuous analytics on geospatial data streams with WSO2 complex event processor. In proceedings of the 9th ACM International Conference on Distributed Event-Based Systems (DEBS '15). ACM, New York, NY, USA, 277-284. DOI=http://dx.doi.org/10.1145/2675743.2772585>http://dx.doi.org/10.1145/2675743.2772585