2011/08/01
1 Aug, 2011

Complex Event Processing and Business Rule Management With SOA

  • Shammi Jayasinghe
  • Architect / Associate Director - WSO2

Introduction

In a world on Event Driven Architecture, most of the business decisions are taken on the results obtained by processing many events acquired from different event sources. Processing event streams and making correct and accurate business decisions at the correct time is the most important thing and it makes the maximum outcome from the available resources.

Processing various event streams and generating new events using the content of the received events is done with complex event processing engines. There are number of complex event processing engines and WSO2 Complex Event Processing server is shipped with Drools fusion complex event processing engine and also the Esper™ complex event processing engine can be installed as a p2 feature.

There are number of use cases that can be identified in the real world in which complex event processing can be used effectively and generate useful events.

Eg:

  • Algorithmic trading: Automated trading based on market movement
  • Security and Fraud detection: Detect patterns among events to discover fraudulent activity
  • Network system management: Correlate system or network faults and take action in real-time.

Other than above mentioned use cases there can be many more. In each of these use case the rule applied (query) is differed from each other and it is depending on the event streams that is processing by the engine.

Applies To:

WSO2 Enterprise Service Bus 4.0.0
WSO2 Business Rules Server 1.2.0
WSO2 Complex Event Processing Server 1.0.0
WSO2 Data Services Server 2.6.0

Contents

  • Use case
  • Proxy Service
  • Transaction Validator Rule Service
  • Fraud Detection Service
  • Fraud Detection Data Service

Use case

In this article, we describe on security and fraud detection use case. It's not practical to use only the complex event processing server alone in real world example. With that in mind, lets go through the use case;

There is such a system where customers can create, process and execute transactions. It consists of two main sections, which is to validate the transaction and to execute the transaction. Here we discuss how it could validate the transaction, detect any fraudulent transactions and prevent the system from executing those transactions. We are using the latest versions (V3.2.0) of WSO2 product stack for the implementation of this scenario.

When a customer logs in to the system and place an order, the system creates a transaction validation request and it is sent to the transaction validation system. This validation system is consisting of latest versions of four WSO2 products to perform four main tasks in order to validate the transaction.

The Scenario

As shown in the above diagram, Proxy client sends a request message to the proxy service deployed in the WSO2 Enterprise Service Bus. Then the proxy service invokes the database using ESB dblookup mediator to get all the relevant information of the customer which is already stored in the database. Once that operation completes it sends the information acquired from the database with the transaction amount to the rule service deployed in the WSO2 Business rules server to validate the amount of the transaction. Here current account balance of the customer is available in the data obtained from the database. The validation rule is a simple rule specified for this sample and it checks whether the account balance is greater than the transaction amount.

If the condition satisfies, rule service returns as a valid response or else response will be an invalid transaction. If the transaction is invalid, proxy service sends back the response to the client, as there is no need of fraud detection for invalid transactions.

In the success case of the transaction, proxy service sends an event to the Complex Event Processing server with transaction details. An event processing service has been deployed in WSO2 Complex Event Processing server to detect fraud transactions. Once an event is received, that event would be passed through the complex event processing engine. If conditions specified in the complex event processing query is satisfied, it will generate a new event on fraud detection.

If a fraud is detected, CEP service will publish this new event to an output topic. A data service deployed in WSO2 Data Services Server is listening to this output topic and it will update the database according to the details of the received event. Here update will be locking the customer account, if a fraud detected.

At the same time after sending events to the complex event processing server, ESB proxy service perform a database call with the DB lookup mediator. With that it checks whether customer account is locked or not. If the account is not locked, it will send the success response message to the client. If the account is locked, it will send failure response message.

Proxy Service

Here we are describing how the actual implementation of the above scenario has done.

WSO2 Enterprise Service Bus has a proxy service and all the message mediation is done using that proxy service. Initially transaction request is received by the ESB proxy service from the proxy client.

Sample transaction request received by the proxy service to validate is ;

		<reqFinancTx>
		 <userID>123</userID>
		 <financTx>456</financTx>
		 <amount>50</amount>
		</reqFinancTx>
		

Now we will describe step by step how the message flow happens;

Step 01

Once the request message received to the proxy service deployed in the ESB, it extracts two properties from the incoming message using ESB property mediator as bellow.

	<property name="txAmount" expression="//reqFinancTx/amount/text()"/>
	<property name="financTx" expression="//reqFinancTx/financTx/text()"/>
	

Step 02

All the information related to a user of the system are stored in the database of the system. In this case it is a mysql database. It is stored in a table called BRMS_USER_INFO. With the use of
incoming property userID a DBLookupMediator of the ESB proxy service, query the Database and extract all the required information from the database and set them to relevant properties.

	<dblookup>
		<connection>
			<pool>
				<password>esb</password>
				<user>esb</user>
				<url>jdbc:mysql://localhost:3306/brms_db</url>
				<driver>com.mysql.jdbc.Driver</driver>
			</pool>
		</connection>

		<statement>
			<sql> SELECT * FROM `BRMS_USER_INFO` where `user_id`=? </sql>
			<parameter expression="//reqFinancTx/userID" type="VARCHAR"/>
			<result name="balance" column="account_balance"/>
			<result name="userId" column="user_id"/>
			<result name="userName" column="name"/>
			<result name="access" column="accessability"/>
		</statement>
	</dblookup>

Step 03

Now the next step is to invoke the rule service with the data extracted from database. In order to do that it is needed to prepare an XML message that is compatible with the rule service that we have deployed in the Business Rules Server to validate the transaction.

To prepare the XML message we are using XSLT mediator of the ESB. With the use of that we can get the template of the message from the xslt file and inject the values to the properties of the message. It is done as bellow;

	<xslt key="sampleXSLT">
		<property name="accountBalance" expression="get-property('balance')"/>
	</xslt>
	

Here the value of the property accountBalance is injected.

Sample message is :

	<p:processTransactionRequest xmlns:p="https://brs.carbon.wso2.org" xmlns:xs="https://transactionApprovalService.samples/xsd">
		<p:Transaction>
			<xs:accountBalance><xsl:value-of select="$accountBalance"/></xs:accountBalance>
			<xs:txAmount><xsl:value-of select="amount/text()"/></xs:txAmount>
			<xs:txId><xsl:value-of select="financTx/text()"/></xs:txId>
			<xs:userId><xsl:value-of select="userID/text()"/></xs:userId>
		</p:Transaction>
	</p:processTransactionRequest>

Step 04

Now we have prepared XML message which all the required values injected to validate the transaction. Then, this XML message is sent to a validation rule service called transactionValidatorService deployed in the Business Rules Server using the send mediator of ESB as bellow.

	<send>
		<endpoint>
			<address uri="https://localhost:9764/services/transactionValidatorService"/>
		</endpoint>
	</send>

Sample message sent to rule service :

<p:processTransactionRequest xmlns:p="https://brs.carbon.wso2.org" xmlns:xs="https://transactionApprovalService.samples/xsd">
	<p:Transaction>
		<xs:accountBalance> 10000</xs:accountBalance>
		<xs:txAmount> 50</xs:txAmount>
		<xs:txId> 456</xs:txId>
		<xs:userId>123</xs:userId>
	</p:Transaction>
</p:processTransactionRequest>

This is the end of the in-sequence of the proxy service. Now it is needed to configure the output sequence.

Step 05

In the output sequence, we are getting the response from the Validation Rule Service. Depending on that result, it is decided whether to send response back to the client or invoke the Complex Event Processing service further. For that, we need to extract the result from the response of the validation rule service and its as bellow;

<property xmlns:brs="https://brs.carbon.wso2.org" xmlns:ax26="https://transactionApprovalService.samples/xsd" name="RESULT" expression="//brs:processTransactionResponse/brs:TransactionResult/ax26:transactionCanProceed"/>

Step 06

If the result of the validation rule service is “true”, we need to send cloned message to Complex Event Processing Service. If its “false”, we can send back to the client. To perform this switching we have added ESB switch mediator as bellow.

	<switch source="get-property('RESULT')">
		<case regex="true">
			// Do what ever need to send the response to cep and the valid response back
		</case>
		<case regex="false">
			// send the invalid response back to the client
			<send/>
		</case>
	</switch>

Step 07

If the result from the validation rule service is “true” we need to send the response for two paths,

  • 1. Complex Event Processing service for fraud detection
  • 2. Response back to the client

We clone the message and send in two separate paths.

	<clone>
		<target>
			<sequence>
					// Send message to cep to detect frauds
			</sequence>
		</target>
		<target>
			<sequence>
					// Send valid response back to client
			</sequence>
		</target>
	</clone>

Step 08

Lets look how we can send message to CEP to detect frauds. For that also we have a message template. We are injecting values in to that template and publish that message to a topic which is listened by the complex event processing service.

It is done as follows;

	<xslt key="cepXSLT">separate
		<property name="userId" expression="get-property('userId')"/>
		<property name="financTx" expression="get-property('financTx')"/>
	</xslt>
	<send>
		<endpoint>
			<address uri="https://localhost:9766/services/localBrokerService/AllTransactions"/>
		</endpoint>
	</send>

The sample request for the CEP service is like bellow;

	<xss:processTransactionRequest xmlns:xss="https://sample.wso2.com">
		<xss:Transaction>
			<xss:price>58</xss:price>
			<xss:userId>123</xss:userId>
			<xss:txId> 456</xss:txId>
		</xss:Transaction>
	</xss:processTransactionRequest>

If the is a fraud detected by the Complex Event Processing service, it will generate a new event. CEP will publish that event to the output topic and a data service deployed in the WSO2 Data Services Server has subscribed to this topic. Once the data service receive a message from this topic, it will update the database as account is locked.

Step 09

We are using the cloned message to send response back to the client. Before we send response back, it is needed to check whether the customer account is not locked for transactions. For that, we are using DB Lookup mediator of the ESB and check the accessibility for the account. If the accessibility is available, it will send the response as Transaction can proceed !. If the account is locked, it will send the response as Fraud Detected, Account Locked !!!.

Data base is queried like bellow;

	<dblookup>
		<connection>
			<pool>
				<password>esb</password>
				<user>esb</user>
				<url>jdbc:mysql://localhost:3306/brms_db</url>
				<driver>com.mysql.jdbc.Driver</driver>
			</pool>
		</connection>
		<statement>
			<sql> SELECT * FROM `BRMS_USER_INFO` where `user_id`=? </sql>
			<parameter expression="//reqFinancTx/userID" type="VARCHAR"/>
			<result name="balance" column="account_balance"/>
			<result name="userId" column="user_id"/>
			<result name="userName" column="name"/>
			<result name="access" column="accessability"/>
		</statement>
	</dblookup>

Then we again need to change the response message according to the accessibility to the account. For that also, we are using ESB switch mediator as bellow.

	<switch source="get-property('access')">
		<case regex="false">
			<xslt key="accLockedXSLT"/>
			<send/>
		</case>
		<case regex="true">
			<xslt key="txProceedXSLT"/>
			<send/>
		</case>
	</switch>

That is how the proxy service in ESB has configured to handle overall message flow. You can find the complete proxy configuration in the attachment.

Transaction Validate Rule Service

Transaction validate rule service is created with two simple rules. If the account balance is greater than the amount of the transaction, transaction will be accepted. Otherwise, transaction will be rejected. Following is the rule used in this service.

rule "Transaction Approval Rule" dialect "mvel" no-loop true salience 4

when
$transaction : Transaction(txAmount  accountBalance )
then
TransactionResult transactionReject = new TransactionResult();
transactionReject.setTransactionCanProceed(false);
transactionReject.setMessage("No enough credits in account to proceed transaction");
insertLogical(transactionReject);
end

There are two classes used in this rule service. You will find the source for these classes attached

  • Transaction
  • TransactionResult

The above rule service is used to validate the transaction. Then it is needed to detect fraud transactions. For that we are using
complex event processing server.

Fraud Detection Service

Fraud detection service is created with the use of Complex Event Processing Server. In CEP server, it is needed to create
a CEP bucket in order to have this service.

When creating a bucket, we need to provide information bellow.

  • complex event processing engine used for that particular bucket
  • Inputs
  • Queries and Output streams

For this sample you can find all of them bellow.

Basic information

Here, we are providing the basic information about the bucket, including name, description and the runtime engine. You need
to have Esper™ feature installed to have EsperCEPRuntime here. You can find more information on creating CEP buckets
here and you can install the Esper™ feature from here

		Bucket Name 		: 	fraudDetectionService

		Bucket Description 	: 	This service will detect fraud request from the client
						         if there are more that five transaction requests arrived
						         within a time frame of one minute

		CEP Runtime 		:  	EsperCEPRuntime 

Input

It is needed to define input streams for a bucket inorder to receive events to the bucket. We are providing topic,
Broker and Mapping to define inputs. If you observe the above defined proxy service carefully, we are sending
xml messages to a topic called AllTransactions. That topic is defined as the input topic.
Any message published to this topic using the broker service localBrokerService will be an input
to the bucket.

In the mapping section, we are defining Xpath prefixes and properties to extract values from the incoming XML messages

		Topic 			: 	AllTransactions
		Broker Name 		: 	localBroker (Select from drop down)

		Mapping		
		Stream 			: 	allTransactions

		XPath prefixes
			Prefix 		:	xss
			Namespace	: 	https://sample.wso2.com  		
		
		Properties
			Name 		:	txCount	
			Xpath		:	//xss:processTransactionRequest/xss:Transaction/xss:price
			Type  	 	:	java.lang.Integer (Select Integer from frop down)
		
			Name 		:	userID	
			Xpath		:	//xss:processTransactionRequest/xss:Transaction/xss:userId
			Type  	 	:	java.lang.String  (Select String from frop down)

Queries

The core of the bucket is the query. All the filtering happens according to the query. We are defining the query here and it differs from complex event processing engine to engine. We have specified the query for EsperCEPEngine here. It will trigger an event, if the transaction count is greater that five for a particular transaction id within a time frame of one minute.

When configuring the query, we are configuring the output also. In that section we are providing a Topic. When a new event is generated,
It will publish that generated event to the topic specified here. In this case it is FraudTransactions. It is possible to configure the
output message also. In the XML mapping section, we can specify the expected output message when a fraud is detected. In the query we specify here, we are getting userID, sum(txCount), txCount. It is possible to inject values of these in to the out put XML message. We have to
specify the name of the variable within curly brackets and it will inject the value of that variable in to the output message.

							 	 	
		
		Query name		:	FraudDetectionQuery
		 	
		Expression as 		: 	Inlined  (Already selected)

		Expression		: 	select userID,sum(txCount),txCount from allTransactions.win:time(1 min) group by   userID having (sum(txCount)>5)
		 	
		Output
		Topic			:	FraudTransactions
		 	
		Broker Name		:	localBroker (Select from drop down)		 	

		XML Mapping			
		XML Mapping Text	:	 <p:insert_fraud_operation xmlns:p="https://pojo.rule.sample/xsd">
							                 <xs:userId xmlns:xs="https://pojo.rule.sample/xsd">{userID}</xs:userId> 
						          </p:insert_fraud_operation>

You can find more information on query definition inwso2complex event processing server documentation

Fraud Detection Data Service

We are using this data service to update value of the accessability column to false, once we get fraud detected event from Complex Event Processing Service

     
                 UPDATE BRMS_USER_INFO set accessability='false' where user_id = ?
                 
     

Conclusion

Complex Event Processing (CEP) systems and Business rule management systems (BRMS) are identified as playing major role in sophisticated systems today and in the future. It will be very useful if we can use them together when making business decisions. The utmost pattern to integrate these heterogeneous systems is to use Service Oriented Architecture (SOA). With this article we have described, how we can integrate four major WSO2 products and use them effectively to implement a transaction validation system. Though this article describes a simple validation system, any one interested in developing production systems to use Business Rules Management Servers and Complex Event Processing Servers can make this as their base and upgrade it to a production stage.

Author

Shammi Jayasinghe, Senior Software Engineer, WSO2 Inc.

Esper™ is a registered trademark of EsperTech.

 

About Author

  • Shammi Jayasinghe
  • Architect / Associate Director
  • WSO2 Inc