[Tutorial] Implementing a WSO2 CEP Extension to Run Machine Learning Models Written in PMML Format

  • By Supun Setunga
  • 19 Aug, 2014
Archived Content
This content is provided for historical perspective only, and may not reflect current conditions. Please refer to the WSO2 analytics page for more up-to-date product information and resources.


Table of contents

  • Introduction
  • Implementing a machine learning model in R
  • Converting the model into PMML format
  • Writing a CEP extension
  • Defining input and output streams in CEP
  • Creating a CEP execution plan
  • Testing
  • Sample


Machine learning, a branch of artificial intelligence, focuses on models that can act and predict on new unseen data, based on the known properties learnt, which are sometimes called experience from a training data set. The training data may come from some unknown probability distribution, and the learner has to build a general model about this data that enables it to produce sufficiently accurate predictions for new cases, which are assumed to come from the same population.

WSO2 CEP is an open source event processing server that identifies incoming events within the event cloud, and acts on them in real-time. On the other hand, data mining and machine learning is related to the study and implementation of models that can learn from data to identify patterns, and is a fast growing aspect in the field of data processing. Integrating WSO2 CEP with such machine learning models enable monitoring, identifying, and capturing patterns of the incoming events in real-time. In order to accomplish this, PMML, developed by the Data Mining Group, can be used; this is an XML-based format of defining models produced by data mining and machine learning algorithms.

Implementing a machine learning model in R

In this section we only discuss how to implement a basic machine learning model using the R software, and we will not be discussing pre-processing data and methods of increasing accuracy as these are not within the scope of this tutorial.

As for the model, a simple Neural Network will be implemented using the ‘nnet’ package in R. Since nnet is an external package, we first we need to install it. Start R and,

> install.packages("nnet")

This will prompt you to select a local mirror, and once it is selected, the package will be downloaded and installed. Thereafter import the package to the current workspace:

> library(“nnet”)

The next step is to import the data set, which we will be using to train the Neural Network. Here we will be using a data set from the competition of kaggle (data set can be found here), which contains one response variable and 22 explanatory variables.

> TrainDataAll <- read.csv("E:/Project/Data1/train.csv")

Then separate the response variable and explanatory variables. Since the response in this scenario is a binary value, we need to treat it as a factor, and not a continuous variable.

> trainData <- data.frame(TrainDataAll[,2:23])

> response <- data.frame(factor(TrainDataAll[,1]))

Converting to factors will change the column name of the response variable (column names can be checked using ls(response) command). Therefore, rename the column name of the response variable to “response”.

> colnames(response)[1]<-"response"

Again column-bind the data into a one variable.

> inputData <- cbind(response ,trainData)

Now the data set has a proper structure, and can be used to create a Neural network.

> ANN <- nnet(response ~ .,inputData, size = 10, rang = 0.01, decay = 0, maxit = 10000)

Additionally, the created Neural Network can be graphically viewed using the following commands.

> library(devtools)

> source_url('https://gist.github.com/fawda123/7471137/raw/c720af2cea5f312717f020 a09946800d 55b8f45b/nnet_plot_update.r')

> plot.nnet(ANN)

The created Neural Network will eventually looks like the image shown in Figure 1.

Figure 1

Here, there is only one output node (response), since the data set has a binary response (only two response categories). If there are more than two categories, the number of output nodes should be equal to the number of response categories of the data set.

Converting the model into PMML format

To convert the created Neural Network model into the PMML format, another package in R called “pmml” would be needed, and similar to the previous instance, the package can be installed and imported as follows.

> install.packages("pmml")

> library(pmml)

Then the model can be converted using the following:

> pmml_NN <- pmml(ANN)

> write(toString(pmml_NN),file = "/home/supun/Supun/Project/NeuralNetworkPmml.pmml")

Now we have a Neural Network model written in PMML format. Next, we need to write a CEP extension to evaluate this model and to get the model’s output for any set of incoming data/event.

Writing the CEP extension

The extension for the CEP can be written by extending “org.wso2.siddhi.core.query.processor. transform.TransformProcessor” and adding the SiddhiExtension Annotation (read more here). To use the PMML model and to evaluate it within Java, we use the “jpmml” library.

*Note: Import all the jpmml libraries and its dependencies downloaded to the class path of the Java project.

The following methods will be overridden inside the extended class.

restoreState() method
protected void restoreState(Object[] objects) {
      if (objects.length > 0 && objects[0] instanceof Map) {
            parameterPositions = (Map <String, Integer>) objects[0];
init() method

All the initializing work will take place inside this method, and it would take the following form.

protected void init(Expression[] parameters, List <ExpressionExecutor> expressionExecutors,
                    	StreamDefinition inStreamDefinition, StreamDefinition outStreamDefinition,
                    	String elementId, SiddhiContext siddhiContext) {

When we write the Siddhi query later on, we should be able to give the input parameters of the neural network inside that query. Therefore, inside the init() method, the first thing to do is to read the values of each of the parameters in the query.

for (Expression parameter : parameters) {
         if (parameter instanceof Variable) {
                Variable var = (Variable) parameter;
                String attributeName = var.getAttributeName();
                parameterPositions.put(attributeName, inStreamDefinition.getAttributePosition(attributeName));

Similar to the input parameters, it is required to give the PMML file or the PMML definition of the model that will be used. Since this can be different for each user, the PMML file also should be allowed to be given inside the query itself. Therefore, we will always consider the first parameter in the Siddhi expression as the PMML definition, which will be provided by the user as a string. Thus, we need to check the first parameter for a string and throw an exception, if not.

if (parameters[0] instanceof StringConstant) {   			
             Expression expression = parameters[0];   			 
             ExpressionExecutor pmmlExecutor = ExecutorParser.parseExpression(expression, null,elementId, false,siddhiContext);   			
             pmml = (String) pmmlExecutor.execute(null); 
} else {
              System.out.println("Cannot find a pmml definition. \nPlease provide the pmml file path" +
   				 " or the full pmml definition as the first attribute in the query");

When giving the PMML definition inside the Siddhi query, users can either give the path to a file containing the PMML definition, or they can provide the complete PMML definition as a string inside the query. Therefore, we need to address both these cases. To create a PMML object in both the scenarios:

PMML model = null;
if (isFilePath(pmml.toString())) {
   	String path = pmml.toString();
   	File pmmlFile = new File(path);
   	try {
   		model = IOUtil.unmarshal(pmmlFile);
   	} catch (Exception e) {
   		System.err.println("Please specify a valid file path\n"+e.getMessage());
} else {
   	InputSource pmmlSource = new InputSource(new StringReader(pmml.toString()));
   	try {
   		model = IOUtil.unmarshal(pmmlSource);
   	} catch (Exception e) {
   		System.err.println("Please specify a valid pmml definition\n"+e.getMessage());

Once a PMML object is created using the PMML definition of the machine learning model (a Neural Network, in this case), we need to create an evaluator, which can be used to evaluate the model using any given set of inputs.

 PMMLManager pmmlManager = new PMMLManager(model);
Evaluator evaluator =(Evaluator) pmmlManager. getModelManager(null, ModelEvaluatorFactory. getInstance());

Using the evaluator, we can identify and extract the input parameters and output parameters of the model.

 List <FieldName> allFields = evaluator.getActiveFields();
 List <FieldName> predictedFields = evaluator.getPredictedFields();
 List <FieldName> outputFields = evaluator.getOutputFields();

For some of the models, the R software creates additional input parameters, for which the values are auto-generated, and users do not have to provide any value. However, there is no way to distinguish between the actual inputs and these auto-generated parameters using the PMML definition. Therefore, we need to filter-out those additional parameters by comparing them with the input parameters given by the user inside the Siddhi query. Here, we expect the user to give only the parameters that are originally required by the model in their query. If the user has missed a parameter in the Siddhi query, which is a required parameter for the model, the jpmml throws an exception and states the missing parameter during the runtime.

List <FieldName> inputs = new ArrayList <FieldName>();
for (FieldName field : allFields) {
   		 if (parameterPositions.keySet().contains(field.getValue())) {

Finally, we need to create and initialize an output stream to send the output of the model. To do that, we extract the names and the datatypes of the output fields and predicted fields from the PMML and initialize an output stream that has attributes with the same name and datatype.

 this.outStreamDefinition = new StreamDefinition().name("pmmlPredictedStream");
   	 for (FieldName predictedField : predictedFields) {
   		 String dataType = evaluator.getDataField(predictedField).getDataType().toString();
   		 Attribute.Type type = null;
   		 if (dataType.equalsIgnoreCase("double")) {
   			 type = Attribute.Type.DOUBLE;
   		 } else if (dataType.equalsIgnoreCase("float")) {
   			 type = Attribute.Type.FLOAT;
   		 } else if (dataType.equalsIgnoreCase("integer")) {
   			 type = Attribute.Type.INT;
   		 } else if (dataType.equalsIgnoreCase("long")) {
   			 type = Attribute.Type.LONG;
   		 } else if (dataType.equalsIgnoreCase("string")) {
   			 type = Attribute.Type.STRING;
   		 } else if (dataType.equalsIgnoreCase("boolean")) {
   			 type = Attribute.Type.BOOL;
   		 this.outStreamDefinition.attribute(predictedField.toString(), type);
   	 for (FieldName outputField : outputFields) {
   		 DataType dataType = evaluator.getOutputField(outputField).getDataType();
   		 if (dataType == null) {
   			 dataType = evaluator.getDataField(predictedFields.get(0)).getDataType();
   		 Attribute.Type type = null;
   		 if (dataType.toString().equalsIgnoreCase("double")) {
   			 type = Attribute.Type.DOUBLE;
   		 } else if (dataType.toString().equalsIgnoreCase("float")) {
   			 type = Attribute.Type.FLOAT;
   		 } else if (dataType.toString().equalsIgnoreCase("integer")) {
   			 type = Attribute.Type.INT;
   		 } else if (dataType.toString().equalsIgnoreCase("long")) {
   			 type = Attribute.Type.LONG;
   		 } else if (dataType.toString().equalsIgnoreCase("string")) {
   			 type = Attribute.Type.STRING;
   		 } else if (dataType.toString().equalsIgnoreCase("boolean")) {
   			 type = Attribute.Type.BOOL;
   		 this.outStreamDefinition.attribute(outputField.toString(), type);

That is all we need to initialize the extension. Next, we need to define what the extension should do once a new event has occured. For that, we override the processEvent() method:

protected InStream processEvent(InEvent inEvent) {

Since we already know what should be the input parameters to the model, we extract the values of each parameter from the newly triggered event.

Map <FieldName, FieldValue> inData = new HashMap <FieldName, FieldValue>();
for (FieldName inputfield : inputs) {
   		 featureName = new FieldName(inputfield.getValue());
   		 featureValue = inEvent.getData(parameterPositions.get(inputfield.toString()));
   		 inData.put(featureName, EvaluatorUtil.prepare(evaluator, featureName, featureValue));

Then pass those set of values to the the evaluator and get the model’s output. Finally, the output is sent to the outstream that was defined earlier.

 Map <FieldName, ?> result = evaluator.evaluate(inData);
   	 Object[] resltObjct = new Object[result.size()];
   	 int i = 0;
   	 for (FieldName fieldName : result.keySet()) {
   		 resltObjct[i++] = EvaluatorUtil.decode(result.get(fieldName));
   	 return new InEvent("pmmlPredictedStream", System.currentTimeMillis(), resltObjct);

Once the extension is written, export it as a .jar file to <CEP_HOME>/repository/components/lib. Copy all the jpmml libraries to the same folder as well. To make CEP aware of the extension, add the complete class name (e.g. org.wso2.siddhi.extension.PmmlModelExecutor) to a new line in the <CEP_HOME>/repository/conf/siddhi/siddhi.extension file, and start the CEP server.

(Read more on creating a custom transformer extension to WSO2 CEP)

Defining input and output streams in CEP

Next we need to define an input stream for the extension. To do this, run CEP and add a new event stream from Main > Event Streams > Add Event Stream.

Figure 2

Give an appropriate name (e.g. “pmmlInputStream”) and a version for the event stream. For the Payload Data Attributes, add all the input parameters of the model, with their datatypes, and as the same order as defined in the model’s PMML definition. In this example there will not be any meta data or correlation data. Once the stream is created, CEP would prompt you to create an event builder for the inputstream. Skip this step by picking the Create Later option.

Similarly, an output stream also can be created (e.g. “pmmlPredictedStream”). But for the output stream, payload data attributes can be one or more of the output parameters of the model.

Creating a CEP execution plan

A new execution plan can be created from Main > Execution Plans > Add Execution Plan. For the Import stream, select the input stream created earlier (pmmlInputStream), and for the export stream, select the output stream previously created (pmmlPredictedStream).

Figure 3

Suppose in the Siddhi annotation of the extension we wrote earlier has the namespace ‘mlearn’ and function name ‘getModelPrediction’, then the Siddhi query expression will be as follows:

from pmmlInputStream#transform.mlearn:getModelPrediction ("/home/supun/Supun/Project/ NeuralNetworkPmml.pmml", A_follower_count, … , B_network_feature_3)
select response,Predicted_response
insert into predictedStream

Inside the getModelPrediction() function, we need to give the PMML file path or the complete PMML definition of the model we are using, followed by all the input parameters of the model. In the select statement, we need to give the exact parameters which we defined when creating the output stream.

If all the properties are set correctly, the execution plan should be created properly; you can view a graphical representation from CEP Home > Monitor > Event Flow, which should look like the image depicted in Figure 4.

Figure 4


To test the created execution plan, we need to send some data using a client and check the output generated. For that, first we need to define the input and output adapters as explained below.

CEP Home > Configure > Input Event Adaptor> Add Input Event Adaptor

Here the name is given as “pmmlInputAdapter” and the type will be “wso2event”. Now we need to link the created adapter to the input stream we created earlier (pmmlInputStream) through an event builder. For that go to Main > Event Streams, and click on the “In-Flows” of the created input stream (pmmlInputStream:1.0.0) and click on “Receive from External Event Stream(via Event Builder)”. Give ”pmmlInputStream_1.0.0_builder” as the builder name and select the previously created “pmmlInputAdapter” as the “Input Event Adaptor Name”, and give the name and the version of the previously created input stream for the stream name and the version (“pmmlInputStream” and “1.0.0” respectively).

Similarly, create an output event adapter from CEP Home > Configure > Input Event Adaptor> Add Input Event Adaptor, having the name “pmmlOutputAdapter” and type “logger”, since we will be publishing the output to the log. Again, to map this adaptor to the output stream created earlier, navigate to Main > Event Streams, and click on “Out-Flows” of our output stream (pmmlPredictedStream:1.0.0). Enter “outputEventFormatter” as the output event formatter name, “pmmlOutputAdapter” as the output event adapter name, “text” as the output event type, and any name for the Unique identification.

Once all the adapters and the event builders/formatters are set, the CEP Event Flow should look like the image depicted in Figure 5.

Figure 5

As the final step, to send data to this extension, we need to create a custom data publisher. A step-by-step guide on creating a data publisher for CEP can be found here.

You can find the complete source code of the extension here.


You can download a sample that uses the above created extension here. The sample uses the PMML model and the data set discussed earlier, and sends one set of feature values to the extension at a time, and obtains the predicted output. Download and unzip the file and follow the steps below to setup and run the sample.

Before we run the sample, we need to setup the extension. To do that, copy the "PmmlModelExecutor.jar" to <CEP_HOME>/repository/components/lib folder. To make CEP aware of the extension, open <CEP_HOME>/repository/conf/siddhi/siddhi.extension file and add “org.wso2.siddhi.extension.PmmlModelExecutor” to a new line. Then copy all the jar files in the "libs" folder to <CEP_HOME>/repository/components/lib.

To setup and run the sample first copy the "0201" folder inside "ML-sample" to <CEP_HOME>/samples/artifacts and the “pmml-publisher" folder inside "ML-sample" to <CEP_HOME>/samples/producers. Navigate to <CEP_HOME>/bin and run the command "./wso2cep-samples.sh -sn 0201" in terminal. CEP Event Flow should look like in figure 5. Open a new terminal/command prompt and navigate to <CEP_HOME>/samples/producers/pmml-publisher and run "ant -Dport=7611", where 7611 is the thrift port of your CEP instance. Observe the outputs of both the terminals.

Refer to CEP documentation for more info on running CEP samples.


WSO2 CEP can be empowered with machine learning features by implementing a machine learning model such as a Neural network in R, converting it to PMML format, and then finally writing an extension to evaluate the model within CEP.