2016/04/19
19 Apr, 2016

[Tutorial] Integrating Apache Nifi with WSO2 Complex Event Processor

  • Janaki Thevanathan
  • - WSO2
Archived Content
This content is provided for historical perspective only, and may not reflect current conditions. Please refer to the WSO2 analytics page for more up-to-date product information and resources.

Table of contents


Prerequisites

WSO2 CEP 4.0.0 or above Download a href="https://wso2.com/products/complex-event-processor/">here
Quick start guide can be found here
Apache Nifi – 0.3.0 or above Download here
Getting started guide can be found here

Introduction

In this article, we’ll talk about the process of integrating the Apache Nifi framework with WSO2 Complex Event Processor (WSO2 CEP). Nifi is a dataflow system based on the concepts of flow-based programming; it supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic, and provides a web-based user interface for creating, visualizing, editing, monitoring, and administering automated data flows (refer to Apache Nifi user guide for more details).

We can integrate Nifi so as to push and and receive the events from WSO2 CEP. For this, we’ll also need a basic understanding of complex event processing.


WSO2 Complex Event Processor

WSO2 CEP is a lightweight, easy-to-use, open source complex event processing server available under the Apache Software License v2.0. WSO2 CEP provides the opportunity to immediately identify interesting event occurrences and event patterns within the event cloud and take real-time action to ensure maximum use of that event. Its architecture looks like this:

Figure 1


Sample scenario

In our scenario, we are going to use Apache Nifi to read a file from the system, push the events to CEP and then write the CEP output back to file. To start with, we are going to create a simple event flow in CEP, with an event receiver (of type HTTP) and an event stream and an event publisher (also of type HTTP).

Firstly, follow the steps in the quick start guide or watch the getting started video in order to create our own event flow.

Figure 2

Now the meat of the lesson begins. We’ll use Apache Nifi to send events to the WSO2 CEP HTTP input adapter and retrieve events from the WSO2 CEP HTTP output adapter. To do so, we need to create two data flows in Nifi. One will read the CSV file in the local system, convert it to JSON and send it to the WSO2 CEP end-point. The other will listen to the WSO2 CEP output adapter and write the content it receives as a file and saved it to the local system.

Figure 3


Setting up WSO2 CEP

Starting CEP is easy (note: for detailed instructions, see starting the server guide). Run the following command inside <CEP_HOME>/bin directory:

  • On Linux: sh wso2server.sh
  • On Windows: wso2server.bat --run

Once the server has started, we can run the Management Console by typing its URL in a web browser. The URL appears next to “Mgt Console URL” in the start script log that is displayed in the command window.

In this example, we run the Management console URL on - https://10.100.4.130:9443/carbon/

Open the management console and log in to it as admin user (default: enter ’”admin” for both username and password).


1. Create a stream

Click on the Main tab. Under Manage, click Streams to open the Available Event Streams page. Then click Add Event Stream to open the Define New Event Stream page. Enter the following information into the define new steam page.

Event stream details

  • Event stream name - org.wso2.event.sensor.stream
  • Event Stream Version - 1.0.0

Meta data

  • timestamp - type long
  • sensorId - type int
  • sensorName - type string

Correlation data

  • longitude - type double
  • latitude - type double

Payload data

  • humidity - type float
  • sensorValue - type double

After entering these details, click Add Event Stream to save the information and create a stream.

Figure 4

(If you need more information, please point your browser to our Add Event Stream documentation page)


2. Create an event receiver

Click on the Main tab. Under Manage, click Receivers to open the Available Event Receivers page. Then click Add Event Receiver to open the Create a New Event Receiver page. Enter the following information into the create a new event receiver page:

  • EventReceiver name - "httpReceiver"
  • Input Event Adapter Type - http
  • Event Stream - org.wso2.event.sensor.stream
  • Message Format - json

Figure 5

After entering the above details, click advance and start mapping the json. For this configure the JSON Mapping by entering the correct JSONPath values as shown in figure 6.

Figure 6

Then click Add Event Receiver to save the information and create an event receiver.

Again, if you need more information, refer to HTTP Event Receiver documentation.


3. Create an event publisher

Click on the Main tab. Under Manage, click Publishers to open the Available Event Publishers page. Then click Add Event Publisher to open the Create a New Event Publisher page. Enter the following information into the create a new event publisher page.

  • EventPublisher name -"httpPublisher"
  • Event Stream - org.wso2.event.sensor.stream
  • Output Event Adapter Type - http
  • HTTP Client Methods - httpPost
  • URL - https://localhost:9091/contentListener
  • Message Format - text

Click Add Event Publisher to save the information and create an event publisher.

Figure 7

For more details refer HTTP Event Publisher.

Now the event flow has been created, and it has an event stream, receiver and publisher (we can also create the execution plan to process the received events, but it’s been omitted in this article. Refer to Add an Execution Plan and Samples on processing events to get a clear understanding of how to add an execution plan in WSO2 CEP).


4. Create the comma separated file

Of course, we need data, and for this we chose to encapsulate it in a Comma Separated File (CSV). Open your text editor, copy the following data and save it in the preferred location. In this example we’ve saved in the location <NIFI_HOME>/cepFile directory under the name of event_json.csv.

4354643,701,temperature,4.504343,20.44345, 2.3,4.504343
5454643,702,temperature,3.504343,15.44345, 4.1,3.544343
1534643,703,temperature,2.346783,23.42367, 2.7,6.452989

Build the data flow in Apache Nifi

Start Nifi by using the mechanism appropriate for your operating system. For Windows users, navigate to <NIFI_HOME>/bin and double-click the run-nifi.bat file. For Linux and OSX users, use a terminal window to navigate to lt;NIFI_HOMEgt; directory and run the bin/nifi.sh command.

Now open up the Nifi UI in your web browser. By default, the web UI will run on https://localhost:8080/nifi.

The dataflows we are going to build in Nifi will consist of the following processors:

  • GetFile - Create contents of a FlowFile from a file in the directory on the local system into Nifi.
  • ConvetCSVToAvro - Convert CSV file to Avro according to an Avro schema
  • ConvertAvroToJSON - Converts a Binary Avro record into a JSON object.
  • PostHttp - Performs an HTTP POST request to CEP with the contents of the FlowFile as the body of the message.
  • ListenHTTP - Starts a HTTP server that is used to receive the flow file from the HTTP output adapter of CEP.
  • PutFile - Writes the content of flow file from Nifi to local system.

The overall data flow looks like the figure 8:

Figure 8


1. Send the content of the flow file to CEP HTTP endpoint

Click on the processor icon in the toolbar and drag it to the canvas. This will bring up the Add Processor dialog where we can select the type of processor we want. You can use tags or search criteria to make it easier to find the processor you’re looking for. Select for the GetFile processor and click on the add button of dialog box.

Figure 9

After the processor is added, configure the processor and meet the minimum requirements to make the processor valid and set up the connection.

Figure 10

Now right click on the processor and click Configure. This will bring up the Configure Processor dialog. Feel free to play around with the other settings. Each setting has a help dialog that can be triggered by clicking on the question mark icon next to the property.

In the Settings tab we get to customize the name of the processor. Give it something reasonable.

Figure 11

After that, click the properties tab and set the input directory. In this example HttpReceiver.txt file is found in the location <NIFI_HOME>/cepGetFile. Once you’re done, you’ve completed setting the GetFile processor.

Bring the ConvertCSVToAvro processor into the canvas and configure its properties tab by providing values for Record schema. For our example, the schema is as follows:

{
"type" :"record",
"name" : "event",
"fields" : [
	{"name":"metaData_timestamp","type" : "long"},
	{"name":"metaData_sensorId","type" : "int"},
	{"name":"metaData_sensorName","type" : "string"},
	{"name":"correlationData_longitude","type" : "double"},
	{"name":"correlationData_latitude","type" : "double"},
	{"name":"humidity","type" : "float"},
	{"name":"sensorValue","type" : "double"}
     ]
}

Figure 12

Don’t forget to pop to the the settings tab - look at auto terminate relationships and tick failure and incompatible in order to make the processor valid.

Figure 13

Then bring the ConvertAvroToJSON processor into the canvas and configure its properties tab by selecting ‘array' as the value for JSON container options. Also configure auto terminate relationships on the settings tab by selecting failure.

Figure 14

Finally, add the PostHttp processor in order to push the JSON event in the HttpReceiver.txt file to HttpReceiver endpoint of CEP. To configure PostHttp, go to auto terminate relationships and tick failure and success and set up the values for URL in the properties tab. The following is the URL format used to receive events: https://localhost:9763/endpoints/<event Receiver name>.

In this example, for instance, we set the URL value as https://localhost:9763/endpoints/httpReceiver

Figure 15

Once processors are added to the graph and configured, the next step is to connect them to one another.

Move the mouse pointer over the center of a GetFile processor. A new Connection icon appears. Drag the Connection bubble from GetFile processor to ConvertCSVToAvro processor until it is highlighted. When you release the mouse, a ‘Create Connection’ dialog appears. Select the success check box to make the relationship between the GetFile processor and the ConvertCSVToAvroprocessor. Then click the add button to complete the connection.

Figure 16

Likewise make the connections between ConvertCSVToAvro - ConvertAvroToJSON and ConvertAvroToJSON - PostHTTP processors by creating success relationships among them.


2. Listen to CEP output adapter and write content of flow file in local system

Bring the processors ListenHTTP and putFile into the canvas. In order to make a connection between these two processors, you need to configure their properties with the minimum requirements.

Configure the ListenHTTP processor with the listening port 9091 that was assigned to start the HTTP server.

Figure 17

Configure the putFile processor by providing directory location - this is the location where we are going to save the flow file. In our example we give its value as <NIFI_HOME>/cepPutFile. Also, select failure and success for auto terminating the relationship.

Figure 18

Now the processors are configured to make the connection between them. Data flows are built up and we’re ready to test the scenario.


Testing the scenario - the ideal flow

Start the data flows by clicking the green start icon in the toolbar.

Once the data flow is started, the getFile processor starts to read the CSV file from the local system and writes the content as a flow file and passes it to the ConvertCSVToAvro processor through success relationship. This processor will convert the flow file(CSV) into Avro and pass the content as a flow file to ConvertCSVToJSON processor through a success relationship. This will convert the content of flow file (Avro) to JSON and pass the value to PostHTTP processor through the success relationship between them. The PostHTTP processor reads that flow file and pushes the content of the flow file to the CEP http receiver endpoint.

The event that is pushed (the content of flow file) to the CEP HTTP receiver endpoint will be collected by the event receiver named HttpReceiver, which directs those received events to an event stream named org.wso2.event.sensor.stream. The HttpPublisher passes the event from stream named org.wso2.event.sensor.stream to the CEP output adapter.

Meanwhile, the ListenHTTP processor starts the HTTP server on the port 9091 and is listening to the port. When CEP publishes the event in the output adapter named HttpPublisher, the ListenHTTP processor reads the published event and starts to write it as a flow file. Once it finishes writing, it sends the flow file to PutProcessor through the success relation between them. Finally the putFile processor saves the flow file in the directory of local system already configured.

For reference, the details of the events reads from the http output adapter (Httppublisher) of CEP will be written on the local system as follows:

meta_timestamp:4354643,
meta_sensorId:701,
meta_sensorName:temperature,
correlation_longitude:4.504343,
correlation_latitude:20.44345,
humidity:2.3,
sensorValue:4.504343

meta_timestamp:5454643,
meta_sensorId:702,
meta_sensorName:temperature,
correlation_longitude:3.504343,
correlation_latitude:15.44345,
humidity:4.1,
sensorValue:3.544343

meta_timestamp:1534643,
meta_sensorId:703,
meta_sensorName:temperature,
correlation_longitude:2.346783,
correlation_latitude:23.42367,
humidity:2.7,
sensorValue:6.452989

Conclusion

WSO2 CEP is a general purpose real time event processing system that retrieves events from various event sources, processes them and publishes the results to various event sinks. With the integration of Apache Nifi, WSO2 CEP can manage event retrieval and publishing via Apache Nifi’s dataflow system. Since WSO2 CEP processes the data and Nifi manages the data flow they both compliment each other to build a very powerful and scalable event based system.


References

 

About Author

  • Janaki Thevanathan