2016/02/29
29 Feb, 2016

[Tutorial] How to Integrate a Human Task with Real Time Decision Making

  • Minudika Gammanpila
  • Software Engineer - WSO2
Archived Content
This content is provided for historical perspective only, and may not reflect current conditions. Please refer to the WSO2 analytics page for more up-to-date product information and resources.

Table of contents


Applies to

WSO2 Complex Event Processor Version 4.0.0 and above
WSO2 Business Process Server Version 3.5.0 and above

Introduction

In the modern world, a sizeable chunk of business operations have been automated. Most of them, however, still require considerable human attention and involvement before being executed or finalized.

Before getting started, let’s go through a brief introduction to WSO2 Complex Event Processor (WSO2 CEP) and WSO2 Business Process Server (WSO2 BPS), the two products that we’re going to be using for this purpose.


WSO2 Complex Event Processor

WSO2 CEP is a lightweight, easy-to-use, open source complex event processing server. It identifies the most meaningful events within the event cloud, analyzes their impact, and acts on them in real-time. It's built to be extremely high performing with WSO2 Siddhi and massively scalable using Apache Storm.


WSO2 Business Process Server

WSO2 BPS is an easy-to-use open source business process server that executes business processes written using BPMN standard or the WS-BPEL standard. It’s powered by Activiti BPMN Engine and Apache ODE BPEL Engine, and provides a complete Web-based graphical console to deploy, manage and view processes in addition to managing and viewing process instances.


General procedure of implementation

Figure 1


Example: Disconnecting users from a broadband connection

Consider a broadband data service provider. Such a provider has it within their power to gather all the information related to the data usage of a user - including his/her payment statistics. Now suppose we need to disconnect the connection of a said user if one of two conditions are met: either his/her data usage exceeds the limit, or his/her remaining payment exceeds the credit limit.

Here’s how this works in practice. WSO2 CEP takes in the data from an outside source. Every 30 days, WSO2 CEP filters out the users who have used more than a set amount - say, 500MB - within that time period. For demo purposes, let’s cut this interval down to 30 seconds and set the amount to a mere 5MB. Once these conditions are hit, the user’s usage and other details are sent to the WSO2 BPS through an HTTP request.

On the WSO2 BPS side, the HTTP request invokes an instance of the process that’s related to the human task, and someone seated behind the system has to take the final decision on whether or not the connection would be disconnected (that’s the whole purpose of this part - to bring human controls into the loop). Then another HTTP request is sent from WSO2 BPS to WSO2 CEP server, mentioning this human decision.

Figure 2


Implementing the example

Consider the following data flow:

Figure 3

As you can see, it involves quite a few components. These will be listed out below with a brief description of what they do, a screenshot and the code involved.


Event streams

InputStream: Used to keep data received by input receiver.

Figure 4

{
  "streamId": "InputStream:1.0.0",
  "name": "InputStream",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "metaData": [
	{
  	"name": "processDefinitionKey",
  	"type": "STRING"
	},
	{
  	"name": "tenantId",
  	"type": "STRING"
	}
  ],
  "correlationData": [],
  "payloadData": [
	{
  	"name": "currentUsage",
  	"type": "LONG"
	},
	{
  	"name": "email",
  	"type": "STRING"
	}
  ]
}
  • It will create the input stream and save the definition as an xml file.

fromManagerStream: Used to keep data received by fromManagerReceiver.

Figure 5

{
  "name": "fromManagerStream",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "payloadData": [
	{
  	"name": "email",
  	"type": "STRING"
	},
	{
  	"name": "isThrottled",
  	"type": "BOOL"
	}
  ]
}
  • It will create the stream named “fromManagerStream” and save the definition as an xml file.

ToManagerStream: Used to keep data given as the output of execution plan.

Figure 6

{
  "name": "toManagerStream",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "payloadData": [
	{
  	"name": "processDefinitionKey",
  	"type": "STRING"
	},
	{
  	"name": "tenantId",
  	"type": "STRING"
	},
	{
  	"name": "email",
  	"type": "STRING"
	},
	{
  	"name": "currentUsage",
  	"type": "LONG"
	},
	{
  	"name": "isThrottled",
  	"type": "BOOL"
	}
  ]
}         
  • It will create the stream named “toManagerStream” and save the definition as an xml file.

Publisher

Used to send processed data from WSO2 CEP to WSO2 BPS.

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="Publisher" statistics="disable" trace="disable" xmlns="https://wso2.org/carbon/eventpublisher">
  <from streamName="toManagerStream" version="1.0.0"/>
  <mapping customMapping="enable" type="json">
	<inline>{
   "processDefinitionKey":{{processDefinitionKey}},
   "tenantId":{{tenantId}},
   "variables": [   
 	{
   	"name":"currentUsage",
   	"value": {{currentUsage}}
 	},	 
 	{
   	"name":"email",
   	"value": {{email}}
 	},
 	{
   	"name":"isThrottled",
   	"value": {{isThrottled}}
 	}
 ]
}</inline>
  </mapping>
  <to eventAdapterType="http">
	<property encrypted="true" name="http.password"></property>
	<property name="http.client.method">HttpPost</property>
	<property                                                                     name="http.url">https://127.0.0.1:9763/bpmn/runtime/process-instances</property>
	<property name="http.username">admin</property>
  </to>
</eventPublisher>
  • It will create a publisher named “Publisher” and save the definition as an xml file.

Event receivers

Input receiver: Used to get data to the WSO2 CEP server.

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="inputReceiver" statistics="disable" trace="disable" xmlns="https://wso2.org/carbon/eventreceiver">
	<from eventAdapterType="http">
    	<property name="transports">all</property>
	</from>
	<mapping customMapping="disable" type="json"/>
	<to streamName="InputStream" version="1.0.0"/>
</eventReceiver>
  • It will create a receiver named “inputReceiver” and save the definition as an xml file.

fromManagerReceiver: Used to get data from the WSO2 BPS. These are dependent on the manager’s decision.

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="fromManagerReceiver" statistics="disable"
	trace="disable" xmlns="https://wso2.org/carbon/eventreceiver">
	<from eventAdapterType="http">
    	<property name="transports">all</property>
	</from>
	<mapping customMapping="disable" type="json"/>
	<to streamName="fromManagerStream" version="1.0.0"/>
</eventReceiver>
  • It will create a receiver named “fromManagerReceiver” and save the definition as an xml file.

Execution plan

Note: The logic implemented within the execution plan is explained in the comments.

/*
 Define the stream named "fromManagerStream", which is used to stream events receive from the BPS
 */

@Import('fromManagerStream:1.0.0')
define stream fromManagerStream (email string, isThrottled bool);

/*
 Define the stream named "usageStream", which is used to stream events receive from the client
 */

@Import('InputStream:1.0.0')
define stream usageStream (meta_processDefinitionKey string, meta_tenantId string, currentUsage long, email string);

/*
 Define the stream named "toManagerStream", which is used to stream events to BPS 
*/

@Export('toManagerStream:1.0.0')
define stream toManagerStream (processDefinitionKey string,tenantId string,email string,currentUsage long,isThrottled bool);

/* 
ThrottledTable is used to keep track of throttled events while the process is executing
*/

define table ThrottledTable(email string);

/*
Select all the events from usageStream which are not throttled yet,
(i.e. events that are not recorded in ThrottledTable) and then insert those events into allowUsageStream
*/

from usageStream[(not((ThrottledTable.email == email) in ThrottledTable))]
select *
insert into allowUsageStream;

/*
Following piece of execution plan does the following:
	1) Get events that came to allowUsageStream within last 30 seconds using a time window.
	2) Check whether the sum of currentUsage of of those events are greater than 5 or not and get the result as a boolean variable named "isThrottled".
	3) Get the sum of the current usage of those events as currentUsage.
	4) Select the values of isThrottled, currentUsage, email, meta_processDefinitionKey, meta_tenantId and group them by email.
	5) Insert all the events that are grouped by email, into groupedStream.
*/

from allowUsageStream#window.time(30 sec)
select sum(currentUsage)>=5 as isThrottled,email, sum(currentUsage) as currentUsage,meta_processDefinitionKey,meta_tenantId
group by email
insert all events into groupedStream;

/*
There may be users who already have exceeded the data limit and yet haven't been throttled because of the managerial decision. There can also be users who are coming into the allowUsageStream for the first time and aren’t registered in the throttle table.

In such cases, we can observe state changing patterns like,
    1) User is throttled -> User is not throttled
    2) User is not throttled -> Iser is throttled

Since allowUsageStream accepts only not-throttled users, we have to only consider the second pattern among the above state changes. When such a pattern is detected, that user's details should be sent to the manager for the human decision to be made. The following code segment does this function.
*/

partition with (email of groupedStream)
begin
from every a1=groupedStream,a2=groupedStream[a1.isThrottled != a2.isThrottled]
select a2.meta_processDefinitionKey as processDefinitionKey,a2.meta_tenantId as tenantId,a2.email,a2.currentUsage,a2.isThrottled
insert into throttledStream;
end;

/*
After selecting users who needs the manager's attention (i.e. isThrottled), they are sent to the BPS through toManagerStream.
*/

from throttledStream[isThrottled == true]
select *
insert into  toManagerStream;

/*---------------------------------------------------------------------------------------------------*
   Following part of the execution plan is concerned with the event data received from BPS
 *---------------------------------------------------------------------------------------------------*/

/*
If the manager has marked a user as a throttled user, that user should be added to the throttledTable.
*/

from fromManagerStream[isThrottled == true]
select email
insert into ThrottledTable;

/*
If the manager has marked a user who is already recorded in the throttledTable as not throttled, then that user should be removed from the throttledTable.
*/

from fromManagerStream[isThrottled == false]
select email,isThrottled
delete ThrottledTable on email == ThrottledTable.email;

/*End of the execution plan*/
  • It will create an execution plan and save as an xml file.

Testing the scenario

To test this we’ll have to start up WSO2 CEP and WSO2 BPS and make the below configurations:

  1. Extract the sample.zip file.
  2. Start the WSO2 CEP server.
  3. Go to Home > Manage > Carbon Applications > Add and add the analyticsApp.car
  4. Start the WSO2 BPS server. Make sure you start it with an offset.
  5. Go to Home > Manage > Processes > Add > BPMN, add the broadbandConnection.bar file and upload it. The deployed process can be viewed at BPMN Explorer > Processes.

    Figure 7

  6. To allow the sample process to send an email, you need to configure it to your email address. For that,
    • Open <BPS_HOME>/repository/conf/activity.xml
    • Add the following details (in the format below) to the file with your email and password

      Figure 8

                                    <property name="mailServerHost" value="smtp.gmail.com"/>
                                    <property name="mailServerPort" value="465"/>
                                    <property name="mailServerUsername" value="[email protected]" />
                                    <property name="mailServerPassword" value="password" />
                                    <property name="mailServerUseTLS" value="false" />
                                    <property name="mailServerUseSSL" value="true" />
                              
  7. WSO2 BPS requires a reference to the URL of WSO2 CEP server in order to communicate with it. To do this follow the below steps:
    • In the WSO2 BPS management console, go to Home > Registry > Browse
    • Go to the location _system/config/
    • Click on Add Collection and give it the name ‘endpoint’ and click on Add. The newly created folder will be opened.

      Figure 9

    • Click on Add Resource and give the following values for required fields then click on Add.
      Name: endpoint.epr
      Media type: text/xml
      Content field:
                                    <wsa:endpointreference xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="https://www.w3schools.com uep_schema.xsd" xmlns:wsa="https://www.w3.org/2005/08/addressing" xmlns:wsdl11="https://schemas.xmlsoap.org/wsdl/">  
                                          <wsa:address>https://localhost:9765/endpoints/fromManagerReceiver</wsa:address>
                                    </wsa:endpointreference>
                              
      Note: Change the port number to the port at which the WSO2 CEP server is running in your computer.
    • Go to BPMN-Explorer.
    • To send data to the WSO2 CEP server, open a new terminal or a new command prompt and send following cURL command.
      curl -X POST -d "{
      	\"event\": {
        	  	\"metaData\": {
      	        	\"processDefinitionKey\": \"myProcess\",
      	        	\"tenantId\": \"-1234\"
          	},
          	\"payloadData\": {
              	\"email\":\"[email protected]\",
              	\"currentUsage\":2,
          	}
      	}
      }" https://localhost:9765/endpoints/inputReceiver
      
      Note: Change the port number to the port that WSO2 CEP is running and change the email address. The email that is sent from the BPS will be sent to this email address.
  8. Change the value of currentUsage and send the above command again and check whether a human task has been invoked at BPMN Explorer. For example, you have to send 2 or more requests within 30 secs so that the total usage of the user exceeds 5. For that send the same request while changing the currentUsage value in the following manner.

    Send the first request as follows:

    curl -X POST -d "{
    	\"event\": {
        	    	\"metaData\": {
        	        	\"processDefinitionKey\": \"myProcess\",
        	        	\"tenantId\": \"-1234\"
         	   	},
            	\"payloadData\": {
             	    	   \"email\":\"[email protected]\",
             	    	   \"currentUsage\":2,
            	}
        	}
    }" https://localhost:9765/endpoints/inputReceiver
    

    Then send the second request:

    curl -X POST -d "{
    	\"event\": {
      	  	\"metaData\": {
       		     	\"processDefinitionKey\": \"myProcess\",
          		  	\"tenantId\": \"-1234\"
        		},
        		\"payloadData\": {
           		 	\"email\":\"[email protected]\",
            		\"currentUsage\":2,
        		}	
    	}
    }" https://localhost:9765/endpoints/inputReceiver
    

    Then send the third request:

    curl -X POST -d "{
    	\"event\": {
    	    	\"metaData\": {
     		       	\"processDefinitionKey\": \"myProcess\",
        		    	\"tenantId\": \"-1234\"
        		},
        		\"payloadData\": {
          		  	\"email\":\"[email protected]\",
           		 	\"currentUsage\":3,
        		}
    	}
    }" https://localhost:9765/endpoints/inputReceiver
    
  9. After sending the third request, a task will be invoked at the BPS side.

    Figure 10

  10. Choose Deactivate Service’ as the decision and click on complete task. An email will be sent according to your decision.
  11. Now send the same request again with the previous email address three times.
  12. No user task will be invoked because that user has been throttled at the previous step.
  13. Repeat the same procedure changing the email address and check the behaviour when the decision is ‘Do Nothing’. In this case, the user will not be throttled at the CEP side.

Conclusion

We’ve set up WSO2 CEP and WSO2 BPS in a manner required to implement a usage-based throttling system as might be used by an Internet services provider. This allows us to implement the process described at the start of the tutorial - one where human tasks are integrated with real-time decision making. It follows that this is merely an example for one use case; many other variations and scenarios are possible in this way with WSO2 CEP and WSO2 BPS.


References

 

About Author

  • Minudika Gammanpila
  • Software Engineer
  • WSO2