2014/07/14
14 Jul, 2014

High Availability Choices for WSO2 Complex Event Processor 3.1.0

  • Shazni Nazeer
  • Senior Lead Solutions Engineer - WSO2

WSO2 Complex Event Processor is a lightweight, easy-to-use, 100% open source complex event processing server licensed under Apache Software License v2.0. In this article we will discuss the importance of having high availability deployments and the high availability deployment options available for WSO2 CEP.

Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.

Introduction

Modern enterprise transactions and activities consist of a stream of events. Enterprises that monitor such events in real time and respond quickly to those environments undoubtedly have greater advantage over their competitors. Complex event processing is all about listening to such events and detecting patterns in real-time without having to store those events. WSO2 CEP fulfills this requirement by identifying the most meaningful events within the event cloud, analyzing their impact and acting on them in real-time. It has extremely high performance and is massively scalable.

For a full list of features available in WSO2 CEP go to https://docs.wso2.org/display/CEP310/Features.


How to run WSO2 CEP

  1. Download WSO2 CEP from https://wso2.com/products/complex-event-processor/
  2. Extract the zip archive into a directory. We will call the extracted directory CEP_HOME
  3. Navigate to the CEP_HOME/bin in the console (terminal)
  4. Enter the following command
    • ./wso2server.sh (In Linux)
    • wso2server.bat (In Windows)

High Availability (HA) deployment addresses various needs; most importantly, disaster management, i.e. if two or more instances of a product are functioning and serving requests from consumers, and if a particular instance goes down, the other available instances can keep serving the consumer without interrupting the consumers service usages.

HA deployment of WSO2 CEP is only supported in versions including and above 3.1.0. It is clear that we need atleast two WSO2 CEPs to create a HA deployment. The two or more nodes will function simultaneously to process events. Optionally we may deploy a load balancer in front of the CEPs. When it comes to clustering WSO2 products, the product nodes may either function as a manager node or a worker node. Updates and changes in the cluster should always be made through the manager node. But in this HA clustered option of WSO2 CEP, all nodes will be configured as management nodes. One thing to be noted here in context of the CEP is that, only one node would send notifications out regardless of the fact that both nodes are functioning as active nodes. This is depicted in figure 1 below;

Figure 1

WSO2 CEP supports three major HA deployment options. Namely,

  • Full-active-active mode
    In this mode clients can send all the request to any of the nodes. Only one node will send the notifications. If the notifying node fails, the other node starts sending notifications.
  • Active-active mode
    This mode is similar to the full-active-active mode, except for the fact that the requests to the nodes are load balanced.
  • Failover mode
    In this mode all the requests are sent only to one node. If that node fails the requests will be sent to the other node.

WSO2 recommends using the failover mode over the other two modes, since it provides better guarantee of event ordering than the other two modes.

In this guide we shall configure two WSO2 CEP nodes (say Node1 and Node2) to be available in a HA deployment. We will call the two nodes’ directories Node1_CEP_HOME and Node2_CEP_HOME.

Initially we have to perform some general configurations;

  1. First we need to set the offset of the second node (this is only required, if the two nodes are running in the same machine). By default WSO2 products have an offset of 0; this means that the port through which the server opens for request processing would be 9443. If another node is to be run on the same machine, it can’t have the offset of 0 since that node would also try to open the port on 9443 resulting in a port conflict. Therefore it’s required to set different offsets to other nodes. If the offset is set to 1, the server will open port 9444 for request processing. This is configured in the
    CEP_HOME/repository/conf/carbon.xml
    . There you can find a tag named
    to set the offset. We’ll set the offset of Node1 to 0 and that of Node2 to 1.
  2. Now we need to enable clustering and set the membership schemes to transform the nodes into well-known members. This can be configured as shown below in the
    CEP_HOME/repository/conf/axis2/axis2.xml
    file. Following is a part of the
    axis.xml
    configuration for Node1.

     
    wka4000
                
                    127.0.0.1
                    4000
                
                  
                    127.0.0.1
                    4001
                
            


    Note that in the tags ‘enable’ property is set to true and that the membershipScheme parameter is set to wka (for well known address). Also note that the two nodes represent our nodes in the setup. Node2 will have a similar configuration except for the fact that the localMemberPort parameter will have a value of 4001.
  3. Next we need to configure the shared database. WSO2 products by default use the inbuilt H2 database for data storage. You may also use other databases like MySQL and Oracle for data storage. In this guide, we’ll use the H2 database since the configuration is easy and doesn’t require any external database setup.
    The databases data-source configurations are done in the
    CEP_HOME/repository/conf/datasources/master-datasources.xml
    file. Open up this file for Node1 change the line shown below. (i.e only change the url to
    AUTO_SERVER=TRUE
    );
    jdbc:h2:repository/database/WSO2CARBON_DB;AUTO_SERVER=TRUE;DB_CLOSE_ON_EXIT=FALSE

    Since we are sharing the database, Node2 must point to Node1’s database. This could be done by modifying Node2’s url value to
    jdbc:h2:repository/database/WSO2CARBON_DB;AUTO_SERVER=TRUE;DB_CLOSE_ON_EXIT=FALSE
    in its
    master-datasources.xml
    file.
    Say that Node1’s home directory is /home/user/wso2cep-3.0.1_node1, then the url would be;
    jdbc:h2:/home/user/wso2cep-3.0.1_node1/repository/database/WSO2CARBON_DB;AUTO_SERVER=TRUE;DB_CLOSE_ON_EXIT=FALSE

Each CEP node needs to enable the HA enabled execution plan to achieve HA processing. This can be enabled by adding a configuration such as the following per execution plan. WSO2 CEP provides high availability in terms of per execution plan. In this way certain execution plans may have the high availability option whereas certain others may not. Execution plan XML configurations are normally stored in

CEP_HOME/repository/deployment/server/executionplans/
. Execution plans can be configured in the UI using the management console as well.

A few things need to be emphasised in configuring execution plans;

  • The execution plan name in each node needs to be the same
  • Each node must have identical query expressions (This requires queries to use same stream names, same type of events and query expressions in every execution plan. The order of the queries need to be the same as well)
  • Each node needs to have the following backend runtime configurations;
    • Snapshot time interval : 0
    • Distributed Processing : RedundantNode

    ...
    
        RedundantNode
        0
    
    ...


Deployment of full active-active mode

Figure 2 depicts this deployment mode;

Figure 2

In this mode as mentioned previously, the client will be sending data to both the nodes. Both the CEP nodes will implement the execution plan as defined. Only one node will be sending notifications out.

To set this up you need to configure the event streams (e.g. org.foo.data) and the execution plans (e.g. Redundant Node Execution Plan) as mentioned previously.

Let’s create an actual stream and check this scenario. I’ll demonstrate the scenario using an already existing sample application in the WSO2 CEP. I’ll be using a pizza-shop sample application which you can find in CEP_HOME/samples/producers/pizza-shop/.

  • Start both CEP nodes.
  • Run the following command inside the above sample application directory of Node1;

    ant pizzaPublisherClient


    This would create two streams named deliveryStream and orderStream. You can check whether it’s created by logging into the management console and navigating to Main -> Event Processor -> Event Streams. Since the H2 database is shared, two streams must have been created in the other node as well. If not, run the above command in the other node as well. If you are running the other node in the same machine with an offset of 1, make sure you set the thrift port to 7612 instead of 7611 in the build.xml file of the sample application. Instead of editing the build.xml file, you may also use -DThriftPort=xxxx command line argument.
  • Next we need to create the execution plan in both the nodes. Be cautious about the previously mentioned points while creating the execution plans for HA. To create an execution plan, click on Main -> Event Processor -> Execution Plan -> Add Execution Plan as shown in the following diagram.

Figure 3

    • Enter pizzaOrderProcessingPlan as the Execution plan name
    • Set the snapshot time interval to 0
    • Select “Redundant node” for Distributed Processing
    • Import the streams created earlier - to do this, choose deliveryStream:1.0.0 as the import stream and import it as deliveryStream. Similarly choose orderStream:1.0.0 as the import stream and import it as orderStream.

  • Next enter the following Siddhi query expressions in the Query expressions field.

from deliveryStream select time, orderNo as order_id insert into pizza_deliveries;


from orderStream#window.time(30 seconds) insert into overdueDeliveries for expired-events;


from overdueDeliveries as overdueStream unidirectional join pizza_deliveries#window.length(100) on pizza_deliveries.order_id == overdueStream.orderNo select count(overdueStream.orderNo) as sumOrderId, overdueStream.customerName insert into deliveredOrders;


You may validate the query by clicking the “Validate Query Expressions” button.
First we create a dynamic stream called pizza_deliveries with two fields deliveredTime and order_id. Once data is available in the stream we insert these values to the pizza_delivery from the delivery stream.


Thereafter, in the second section we check the order stream which only keeps events for the last 30 seconds and copies those events to a dynamically created stream called overdueDeliveries.


Then in the last section we compare and contrast the overdueDeliveries as overdueStream with the pizza_deliveries table for similar order_id’s which is to be put inside another dynamically created table named deliveredOrders.

  • Then in the Export Stream section, enter deliveredOrders for “Value Of” field and select “Create Steam Definition” from the “streamId” drop down. This will bring “New Event Stream” window. Click “Add Event Stream”. A pop up message saying “Stream Definition added successfully” should appear. Following this, another message box appears, providing user an option to create Output Event Formatters. Close this window. We will create Output Event Formatters shortly.
  • Next as a notification test, we will be sending the output into a database. For this we need to create a MySQL database called ‘testdb’ and a table called delivered_orders as shown below;

  •   
    mysql> create database testdb;
    mysql> use testdb;
    mysql> CREATE TABLE delivered_orders (sum_order_id BIGINT, customer_name VARCHAR(20));
    
  • ext we have to add a datasource into the CEPs. For this click Configure -> DataSource -> Add Data Source. Since we have to add the datasource for the created MySQL database enter the following information;
  • Data Source Type RDBMS
    Name TESTDB_MYSQL_DB
    Data Source Provider default
    Driver com.mysql.jdbc.Driver
    URL jdbc:mysql://localhost:3306/testdb
    User Name MySQL user name
    Password MySQL password
    • For the next step we will create Output Event Adapters.
      For this navigate to Configure -> Event Processor Configs -> Output Event Adaptors and click “Add Output Event Adapter”. Enter the following details and click “Add Event Adaptor” after checking the connection;
    Event Adaptor Name MySQLOutputAdaptor
    Event Adaptor Type mysql
    Data Source Name TESTDB_MYSQL_DB
    • Next we need to create Event Formatter. For this, navigate to Manage -> Event Processor -> Event Streams and click “out-flows” for the deliveredOrders stream and click “Publish to External Event Stream (via Event Formatter)”. This is where we configure mappings of the deliveredStream and the delivered_orders table in MySQL. Enter the following details;
      Event Formatter Name PizzaDeliveryEventFormatter
      Output Event Adaptor Name MySQLOutputAdaptor
      Table Name delivered_orders
      Execution Mode Insert or update
      Composite Key Columns customer_name
      Click 'Advanced' and insert the following mappings
      sum_order_id sumOrderId
      customer_name customerName
      Now click "Add Event Formatter".

    Now everything is defined and the corresponding configuration files have been created in their respective directories in CEP_HOME/repository/deployments/server/.

    Since, in the full-active-active mode, the client sends the data to both the nodes, we have to modify the pizza-shop sample (after keeping a backup) to send data to both the nodes, instead of one.

    While data is being processed, if one node goes down (let’s say the active node - which is normally the first node you started), the other node becomes active and sends the notifications.

    You can verify this by killing the active node and sending data. This would result in the second node taking over and writing to the MySQL DB in the above example.


    Deployment of active-active and failover modes

    The following diagram depicts this mode;

    Figure 4

    In active-active mode client will only be sending data to one of the nodes at once, whereas in failover mode client will only send data to one node. The diagram applies to both modes. To set this up, certain additional things need to be configured apart from configuring the event streams (e.g. org.foo.data) and execution plans (e.g. Redundant Node Execution Plan).

    • Create WSO2 Output Event Adapters as mentioned in the previous example, pointing to each other nodes. On node1, create a WSO2 Output Event Adapter to send events to Node2’s thrift port and vice versa.
    • Create pass-through execution plans (e.g. execution plan in above figure) to pass events from their respective distributed streams. Also, create identical distributed incoming streams.
    • Create WSO2 Event Formatters per stream to pass the events to the other CEP node.

    After these configurations the distributed stream definitions will work in full-active-active mode. The redundant node execution plan is executed in both the nodes, but only one node will be sending data out.

    To demonstrate this scenario, you may follow the above pizza-shop sample but your client should send data to only one node at a time (in case of active-active) or only to one node all the time (in failover mode).


    Summary

    High availability (HA) is a very important requirement in modern applications. Most products in the WSO2 stack provide high availability choices; WSO2 Complex Event Processor is of no exception. In this article we have looked into various options available in the WSO2 CEP for HA along with some samples. This article is a good starting point if you are looking for HA deployments of WSO2 CEP.

     

    About Author

    • Shazni Nazeer
    • Senior Lead Solutions Engineer
    • WSO2