[Article] Get Near Real-Time API Traffic Notifications with WSO2 Complex Event Processor

  • By Nadeesha Gamage
  • 1 Apr, 2015

WSO2 API Manager provides capabilities to build an API ecosystem through which APIs can be advertized and managed. API administration is a key component in API management and it is vital for administrators and API publishers to know how well these meet SLAs. WSO2 API Manager and WSO2 Complex Event Processor (CEP) can provide the ability to monitor API traffic in near real-time and notify administrators if SLAs are violated. This article will focus on how WSO2 CEP can be integrated with WSO2 API Manager and provide benefits to an organization.

Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.

Relates to

WSO2 API Manager Version 1.8.0
WSO2 Complex Event Processor Version 3.1.0


WSO2 API Manager provides a complete solution to design, publish, and manage APIs. It also provides the ability to manage a developer community and to provide the capability to act as an API gateway to route runtime API traffic. WSO2 API Manager provides many core API management capabilities out of the box and also has built-in extension points that can provide out-of-the box integration with other WSO2 products, such as WSO2 Business Activity Monitor (WSO2 BAM) and WSO2 Complex Event Processor (WSO2 CEP). Thrift data publisher is one such extension point that allows WSO2 API Manager to publish runtime data to external thrift receivers using the Apache thrift protocol. Apache thrift is a binary communication protocol that allows high-speed, non-blocking data transmission between two services. The Apache thrift extension point is used by the API Manager to publish runtime data to an external WSO2 BAM instance that would store and process runtime statistics for reporting purposes. The same thrift extension point can be used by the WSO2 API Manager to publish events to WSO2 CEP, which can process these events to provide alerts and notifications as required. A solution can be built using WSO2 API Manager and WSO2 CEP that can provide near real-time alerting to administrators and other organizational stakeholders if certain SLAs are violated. This article will discuss how these products can be integrated and how notifications can be sent to relevant stakeholders in the organization.

Integration of WSO2 API Manager and WSO2 CEP

Figure 1 illustrates how API Manager and WSO2 CEP are integrated.

Figure 1

The API gateway component of the WSO2 API Manager receives API traffic from different applications. The API gateway would validate, rate limit and route the invocation to a relevant backend service. Simultaneously, the API Manager instance would asynchronously publish data to the WSO2 CEP instance using its thrift extension point. Data would be sent as an event stream to WSO2 CEP.

Published data is received by the input event adapter in WSO2 CEP, which can listen to events that come from multiple sources, such as WSO2 product instances (via thrift), HTTP, email, JMS, etc. Once the data is received by the input event adapter, an event builder would convert the input data to an event stream that can be processed by the event processor of the CEP. The event processor would process the events based on a predefined execution plan and determine which events need to be added to the output stream. The event formatter would convert the output events to an event stream that would be sent out by the output event adapter. The output event adapter can publish data to multiple systems including JMS Brokers, HTTP services, or email clients.

This process happens in near real-time, enabling notifications to be sent immediately to inform relevant stakeholders about any SLA violations.


In this scenario, the latency of APIs that are exposed via the API manager are monitored in near real-time by the WSO2 CEP instance and notifications are triggered via email if these SLAs are not adhered to. The email notification in this case would be sent out to the system admin informing them of the SLA violation. This scenario has been illustrated in Figure 2.

Figure 2

  1. As the first step, let’s unpack both API manager and CEP; if both servers are running on the same machine, a port offset needs to be set. In this case, let’s set the port offset of the CEP server as 1. Port offset can be set in the following configuration file {Product_Home}/repository/conf/carbon.xml.
  2. Let’s enable stats publishing in the API manager instance. This can be done by enabling the ‘APIUsageTracking’ in {API_Manager}/repository/conf/api-manager.xml. Note that the default Thrift port also needs to be changed to 7612 as we have set a port offset for the CEP server. After enabling the configuration it should look like what’s shown below.
  3. Now the API manager is configured to publish events to the CEP. Let’s configure the thrift data receiver in the CEP. This can be configured in the {CEP_HOME}/repository/conf/data-bridge/data-bridge-config.xml. Hostname of the thrift data receiver needs to be uncommented with the relevant hostname. Note that the thrift ports need to remain the same and the CEP instance would increment the port numbers at the server startup. The final configuration should look like what’s indicated below.
  4. Email transport sender needs to be enabled in the CEP to allow the CEP to send emails out. This can be configured in {CEP_HOME}/repository/conf/axis2/axis2_client.xml file. Modify the section below to configure the email account credentials from which the email will be sent out.
    <transportSender name="mailto" class="org.apache.axis2.transport.mail.MailTransportSender">
    	<parameter name="mail.smtp.from">[email protected]</parameter>
    	<parameter name="mail.smtp.user">wso2demomail</parameter>
    	<parameter name="mail.smtp.password">mailpassword</parameter>
    	<parameter name="mail.smtp.host">smtp.gmail.com</parameter>
    	<parameter name="mail.smtp.port">587</parameter>
    	<parameter name="mail.smtp.starttls.enable">true</parameter>
    	<parameter name="mail.smtp.auth">true</parameter>
  5. You can now start the CEP and API manager servers. Once the servers are started, an input event stream needs to be created in the CEP to accept the event stream coming from the API manager. This stream definition would be automatically created in the CEP once the event stream is received to the CEP. Hence, invoke an API from the API manager, which will create 2 message streams in the CEP. In addition to these 2 streams, a fault stream definition can also be created when a fault API is invoked. Once the stream definitions are added, you will be able to log in to the admin console of the CEP and view the available data streams. Figure 3 is a screenshot of the available event streams once all the stream definitions are added.

    Figure 3

  6. We now need to add an output stream adapter; follow the steps given in the following URL to add an output event adapter.
  7. Next an execution plan needs to be created within the CEP. The execution plan does the actual processing of events based on a set of Siddhi queries defined. The execution plan would use the data input streams defined in the previous step. In this article, we will define an execution plan to monitor latency in the API responses. To enforce the SLA on the latency of API calls, the execution plan would use org.wso2.apimgt.statistics.response:1.0.0 event stream. We will first navigate to the execution plan section from the left-hand navigation section and click on ‘Add Execution Plan’. Enter an execution plan name and import org.wso2.apimgt.statistics.response:1.0.0 data stream as ‘responseStream’. We will now enter the execution plan below, which would trigger a mail for any API invocation that has a latency of 200ms or more.

    from responseStream[serviceTime>200]

    select api,api_version,serviceTime, applicationName

    insert into emailStream

  8. The above execution plan enters SLA violated events to an output stream, which needs to be created and imported to the execution plan. An output stream can be created by adding the stream name in the ‘Export Stream’ section and selecting ‘Create Stream Definition’ from the ‘Stream Id’ dropdown. This will bring up a window with auto populated values that allow the creation of a stream definition. Scroll down and click on ‘Add Event Stream’ button. This will prompt a user to create an event formatter. Select the Custom Event Formatter and click on ‘Ok’.
  9. The above step would take the user to a page where the event formatter can be created. In the event formatter page you would need to provide the mapping from the output stream to the email format that would be sending out to the admin. In the event formatter page you need to provide the email address, subject, and message body that can be in multiple formats. In this case, we will be defining a text message format that would contain the details of the SLA violated API in the message body. Figure 4 is a screenshot of the event formatter page. Once you have entered the required information, click on the add event formatter button.

    Figure 4

  10. Once the above step is completed, you can see the newly created output stream to the execution plan. The screenshot in Figure 5 provides the execution plan once everything has been completed. If everything is as expected, then add the execution plan to the CEP.

    Figure 5

Once the execution plan is added you can invoke an API via the API manager. If the latency of the API is more than 200ms the CEP would trigger an email to the admin’s email address defined in the output event formatter.

The scenario given above demonstrated how WSO2 CEP can trigger notifications based on the latency of an API. The same process can be followed to trigger notifications in scenarios where the backend API becomes unavailable if a single user is excessively using an API or if an API call is made with certain sensitive information.

Deploying both BAM and CEP together

Given that WSO2 BAM is a key product that needs to be integrated with WSO2 API Manager, it is possible for the API manager to define multiple thrift receivers for BAM and CEP. In this case, the API manager would publish API data to both CEP and BAM at the same time. BAM would use the data for statistics generation while the CEP would use the data for real-time monitoring and notifications.


In this article, we looked at how we can use CEP to process events sent by WSO2 API Manager to determine if API-related SLAs are violated and email the relevant users in the event of any SLA violations. WSO2 CEP can allow the organization to not only monitor many API-related indices, but trigger events in near real-time if any of these violate any predefined SLAs. This gives the opportunity for an organization to provide a better service to the API community.


[1] https://docs.wso2.com/display/CEP310/Architecture

[2] https://docs.wso2.com/display/AM180/Publishing+API+Runtime+Statistics

[3] https://docs.wso2.com/display/CEP310/Output+Email+Event+Adapter