2014/05/06
6 May, 2014

Real-time loan processing with WSO2 CEP 3.1 - Part 1

  • Mohanadarshan Vivekanandalingam
  • Technical Lead - WSO2
Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to relevant product page for more up-to-date product information and resources.

Table of contents

  • What is WSO2 CEP and how does it work?
  • High-level view of WSO2 CEP
  • Architecture of WSO2 CEP 3.X
  • How event processing happens in CEP
  • Use case
  • Scenario 1 - Loan request processing
  • Conclusion
  • References

What is WSO2 CEP and how does it work?

The WSO2 Complex Event Processor (CEP) provides complex event processing and event stream processing to enable solutions that require analysis over time, such as fraud detection, algorithmic trading analysis, sales, shipping or order status and other business events. Using WSO2 CEP, developers and architects can easily create queries and analysis of real-time event streams to drive real-time business decisions based on time-based events.

WSO2 CEP 3.X builds on the performance capabilities introduced in 2012 with version 2.0, which included a new WSO2 Siddhi CEP engine that increased performance by up to 10x and support for the high-performance Apache Thrift transport. The newest version has been rewritten to reduce the tight coupling between product components and enable their use in other WSO2 middleware products. Other major enhancements have been made to WSO2’s CEP engine and its event management functionality.

WSO2 CEP 3.X also offers several enhancements to event management. A new event builder for mapping incoming events and a new event formatter for mapping outgoing events both allow users to change the event format as required to other formats, such as XML, JSON, Text or Map, as well as filter attributes of the event. Additionally, the event formatter has been integrated with the registry so users can define the mapping templates in the registry and can use/reuse them dynamically in different execution plans. Support for multiple input event adaptors and output event adaptors gives users the option to select the best approach for each use case.

(From WSO2 CEP 3.0 press release)

High-level view of WSO2 CEP

At a high level, the CEP takes some event streams as input from different data sources, then processes those events (filter, join, window processing, aggregation and patterns). The processed events are then sent out as streams.

Architecture of WSO2 CEP 3.X

The above diagram shows the overall architecture of the CEP. External users can communicate with the CEP in different ways and different data formats.

  • Input, Output Adaptors: For receiving and publishing events - has the configurations to connect to external endpoints.
  • Event Builder, Formatter: For event format transformation - out of the box support for Map, Text, WSO2Event, XML, JSON.
  • Execution Plan: Contains the execution logic and provides an isolated environment for Siddhi runtime engine.
  • Event Stream Manager: Event Stream Manager stores stream definitions in the registry of the server. An event stream is the typed flow of events that follows through one or many execution plans.

How processing happens in the CEP

In the CEP, the Siddhi processing engine carries out real-time event processing. Incoming events are not stored in any database or file system before being processing. All of the events are processed directly and only kept in the memory when needed. Due to this approach, Siddhi can process events with high throughput and low latency. However, the user needs to have a good understanding of the events that are kept in memory (especially window size and timeframe of the window).

Use case

The main motivation for this use case is to provide knowledge about some new CEP features and provide a clear understanding on how to create a proper process flow in the CEP. The information given below will be helpful for a beginner to understand some basic concepts about the CEP and how to connect the WSO2 CEP with the WSO2 ESB as well.

Loan requests - Loan request e­mails are sent by customers of a bank. The ESB checks whether he/she is a valid customer based on his/her e­mail address using DB­Lookup mediator. If the e­mail is valid, then the customer-related info is retrieved from the database and sent to the CEP for further processing. In the CEP, using event table we check whether that specific customer’s account balance is enough to process a loan (whether the account balance is more than a specific amount). Then we send replies via e­mail based on their validity. Some customers are blacklisted if they send multiple emails in a given time period based on pattern identification. This information is updated to the MySQL database as well.

Loan payments - Loan payment information is sent by an internal service via HTTP to the CEP. The CEP receives that information, updates the loan table based on the information provided by the event and sends a notification to the user by confirming the payment.

The CEP will mainly process two incoming streams. They are

  • loanRequestStream - This event stream is created in the ESB based on emails received by customers as loan requests.
  • loanPaymentStream - This event stream contains information related to loan payments.

Scenario 1 - Loan request processing

Products used

ESB 4.8.1 and CEP 3.1.0

Prerequisites for this use case

Setup MySQL database

  1. Create a database called clientDB
  2. create database clientDB;

  3. Create a table called clientInfoTable to store client information.
  4.   
    create table clientInfoTable(
    clientName varchar(100),
    clientEmailAddress varchar(100),
    clientPhoneNo varchar(20),
    clientAddress varchar(50),
    clientAccountNo varchar(20)
    );
    
  5. Insert some values (add email address, from where you are going to send emails).
  6.   
    insert into clientInfoTable values ("Mohan","[email protected]","094771117672","California","AC67536");
    insert into clientInfoTable values ("Dushan","[email protected]","0947343117672","Arizona","AC66736");
    insert into clientInfoTable values ("Ishan","[email protected]","0947783457672","Indiana","AC66746");
    
  7. Create a table called accountInfoTable to store account information.
  8.   
    create table accountInfoTable(
    clientAccountNo varchar(20),
    accountBalance double
    );
    
  9. Insert some values to accountInfoTable table.
  10.   
    insert into accountInfoTable values ("AC67536",250000);
    insert into accountInfoTable values ("AC66736",1000);
    insert into accountInfoTable values ("AC66746",10000);
    
  11. Create a table called clientLoanTable to store client loan information.
  12.   
    create table clientLoanTable(
    clientAccountNo varchar(20),
    toalLoanAmount double,
    totalAmountDue double
    );
    
  13. Store some value values to clientLoanTable
  14.   
    insert into clientLoanTable values ("AC67536",200000,220000);
    insert into clientLoanTable values ("AC66746",100000,110000);
    
  15. Create a table called BlackListedCustomerInfoTable to store blacklisted account info.
  16.   
    create table BlackListedCustomerInfoTable(
    clientAccountNo varchar(20)
    );
    

Configuring the ESB to receive emails and for processing

  1. Start the ESB with the offset “1” since we are running the CEP with default offset (by default WSO2 servers bind to the port 9443).
  2. Enable emailTransportListener to receive emails from external email clients. Go to the axis2.xml in /repository/conf/axis2.xml and uncomment the following line

    class="org.apache.axis2.transport.mail.MailTransportListener"/>

  3. Since we have used a MySQL database to store data, the MySQL java connector to /repository/components/lib (Can be downloaded at https://dev.mysql.com/downloads/connector/j/5.0.html)
  4. Create a BAM profile with the following information in the ESB. Follow the steps below,
  5. Before adding a BAM mediator to a mediator sequence the BAM mediator configurations should be updated as shown below. Configuration information is basically considered as a set of BAM server profiles that contains transport and credential data required to connect to the BAM/CEP server. In each BAM server profile, one or many event stream configurations should be defined. Streams contain Thrift API event­stream ­related information like stream name and stream version and data to be extracted from the configuration context of the mediation sequence.

    • Navigate to the ESB management console and select menu "Configure ­> BAM ­> Server Profile".
    • If there are no existing profiles, add one using the "Add Profile" link.
      
    
       
          
             
             
             
             
             
                
                
             
             
                
             
             
                
                   
                      root
                      root
                      jdbc:mysql://localhost:3306/clientDB
                      com.mysql.jdbc.Driver
                   
                
                
                   select * from clientInfoTable where clientEmailAddress =?
                   
                   
                   
                   
                   
                
             
             
                
             
             
                
                   
                      
                   
                   
                      
                         
                      
                   
                
                
                   
                      
                   
                
             
             
          
          
             
                
             
          
       
       pop.gmail.com
       5
       wso2cep@123
       text/plain
       995
       wso2cep.demo
       false
       995
       [email protected]
       javax.net.ssl.SSLSocketFactory
       pop3
       
    
    
  6. Information regarding the DB Lookup mediator can be found in the URL given below.
  7. https://docs.wso2.org/display/ESB481/DBLookup+Mediator

    ESB sample on ESB Lookup mediator

    https://docs.wso2.org/display/ESB470/Sample+360%3A+Introduction+to+dblookup+Mediator

    Now we have completed configuring the ESB for the above scenario. If you send an email to the above configured email address, the ESB can read that email and use it for processing. The ESB will create a thrift event (in the CEP we call this WSO2Event) and send it to the CEP as a stream. When sending the first event, the ESB will forward the stream definition to the CEP to identify the events.

    Given below is the stream definition that is created by the ESB in the CEP.

      
    StreamDefinition{
            streamId='loanRequestStream:1.0.0',
            name='loanRequestStream',
            version='1.0.0',
            nickName='loanRequest',
            description='Stream which contains events related to loan request',
            tags=null,
            metaData=[Attribute{name='tenant_id', type=INT}, Attribute{name='http_method', type=STRING}, Attribute{name='character_set_encoding', type=STRING}, Attribute{name='remote_address', type=STRING}, Attribute{name='transport_in_url', type=STRING}, Attribute{name='message_type', type=STRING}, Attribute{name='remote_host', type=STRING}, Attribute{name='service_prefix', type=STRING}, Attribute{name='host', type=STRING}],
            correlationData=[Attribute{name='activity_id', type=STRING}],
            payloadData=[Attribute{name='message_direction', type=STRING}, Attribute{name='service_name', type=STRING}, Attribute{name='operation_name', type=STRING}, Attribute{name='message_id', type=STRING}, Attribute{name='timestamp', type=LONG}, Attribute{name='fromAddress', type=STRING}, Attribute{name='mailSubject', type=STRING}, Attribute{name='emailBody', type=STRING}, Attribute{name='clientPhoneNo', type=STRING}, Attribute{name='clientName', type=STRING}, Attribute{name='clientResidenceAddress', type=STRING}, Attribute{name='clientAccountNo', type=STRING}, Attribute{name='soap_header', type=STRING}, Attribute{name='soap_body', type=STRING}],
    }
    

    It contains some other attributes other than what we defined in the BAM profile.

Configuring CEP

  1. Add the MySQL java connector to /repository/components/lib since MySQL is used as the database to create event tables.
  2. Uncomment the below lines in axis2_client.xml in /repository/conf/axis2/ and uncomment the following line and add the necessary values.
  3.   
    
            [email protected]
            wso2cep.demo
            wso2cep@123
            smtp.gmail.com
            587
            true
            true
        
    
  4. Then create a data source as shown below.
  5. (https://docs.wso2.org/display/CEP300/RDBMS+Datasources)

  6. Now we have completed the necessary work to setup this scenario. We will discuss how to define the flow for two separate use cases.
  7. Loan request processing

  8. Creating an input event adaptor
  9. Since we are sending events from the ESB (emails received by customers) through the BAM profile, we need to create a WSO2EventAdaptor. By default, the CEP ships with an adaptor called DefaultWSO2InputEventAdaptor that we will use in this use case to receive events from the ESB.

    To create an adaptor flow refer to the following link

    https://docs.wso2.org/display/CEP310/Input+WSO2Event+Event+Adaptor

  10. Creating an output event adaptor
  11. Here, filtered events are going to be sent out from the CEP in two different ways.

    • Email adaptor - to send event output as emails. We create an email adaptor called “emailAdaptor” for this usecase (Refer to
    • https://docs.wso2.org/display/CEP310/Output+Email+Event+Adaptor for more info).

    • MySQL adaptor - to store the loan request information. We create a mysql adaptor called “mysqlAdaptor” for this use case. (Refer to
    • https://docs.wso2.org/display/CEP310/Output+MySQL+Event+Adaptor for more info).

    • Creating an input stream and event builder.
    • Then, we need to create the input event stream. In this use case the input stream is called loanRequestStream:1.0.0. The ESB acts as a datasource. When the first message is sent to the CEP from the ESB it creates corresponding event streams and event builders in the CEP, with the below names, by default.

      Event stream created : loanRequestStream:1.0.0

      Event builder created : loanRequestStream_1.0.0_builder

    • Creating an execution plan
    • Now, we can start writing necessary siddhi queries to process events. (Refer to

      https://docs.wso2.org/display/CEP300/Configuring+Execution+Plans for more info).

      We import loanRequestStream:1.0.0 as an input stream for processing as shown in the below image. Then we add the siddhi query as given below. From those we export two event streams as output streams.

      Siddhi queries

      In the first and second queries, we are defining event tables that are mapped with db tables which are created in a mysql database. accountInfoTable contains necessary information regarding customer accounts and blackListedCustomerInfoTable contains information regarding blacklisted customers. In the third query, when a loan request is received it will be validated to check whether it is from blacklisted customer. If request is from a valid customer then the information will be put into the stream called filteredLoanRequestStream. In the fourth and fifth queries, when a loan request is received Siddhi will check whether they are loan requests received from the same customer in the past hour (to check for junk mail). If any patterns are found specific customer will be blacklisted. In the sixth query we again validate whether it is a blacklisted customer using event tables. In the seventh and eighth queries, Siddhi will check whether the customer who sent the loan requests have enough account balance. If a customer has a sufficient account balance then the loan request information will be added to the validLoanRequestStream. But if customer doesn’t have a sufficient account balance then their information will be added to the inValidLoanRequestStream. For this query we have used Siddhi filters.

        
      define table accountInfoTable (clientAccountNo string, accountBalance double) from ('datasource.name'='mysql_datasource', 'database.name'='clientDB', 'table.name'='accountInfoTable');
      
      define table blackListedCustomerInfoTable (clientAccountNo string) from ('datasource.name'='mysql_datasource', 'database.name'='clientDB', 'table.name'='blackListedCustomerInfoTable');
      
      from loanRequestStream[not (( clientAccountNo==blackListedCustomerInfoTable.clientAccountNo ) in blackListedCustomerInfoTable )]
      select fromAddress,mailSubject,emailBody,clientPhoneNo, clientName, clientResidenceAddress, clientAccountNo
      insert into filteredLoanRequestStream;
      
      from every  a1 = filteredLoanRequestStream -> 
      	    a2 = filteredLoanRequestStream [a1.clientAccountNo==a2.clientAccountNo] ->
      	    a3 = filteredLoanRequestStream [a2.clientAccountNo==a3.clientAccountNo] 
      	within 1 hour
      	select a1.clientAccountNo
      	insert into blackListedStream;
      
      from blackListedStream insert into blackListedCustomerInfoTable;
      
      from filteredLoanRequestStream[not (( clientAccountNo==blackListedCustomerInfoTable.clientAccountNo ) in blackListedCustomerInfoTable )]
      insert into eligibleLoanRequestStream;
      
      from accountInfoTable as accountInfoStream 
      join eligibleLoanRequestStream on accountInfoStream.clientAccountNo == eligibleLoanRequestStream.clientAccountNo and accountInfoStream.accountBalance > 10000 
      select eligibleLoanRequestStream.clientAccountNo as clientAccountNo,eligibleLoanRequestStream.clientName as clientName , eligibleLoanRequestStream.clientResidenceAddress as address , eligibleLoanRequestStream.clientPhoneNo as phoneNo
      ,eligibleLoanRequestStream.fromAddress as emailAddress, eligibleLoanRequestStream.emailBody
      insert into validLoanRequestStream;
      
      from accountInfoTable as accountInfoStream 
      join eligibleLoanRequestStream on accountInfoStream.clientAccountNo == eligibleLoanRequestStream.clientAccountNo and accountInfoStream.accountBalance < 10000 
      select eligibleLoanRequestStream.clientAccountNo as clientAccountNo,eligibleLoanRequestStream.clientName as clientName , eligibleLoanRequestStream.clientResidenceAddress as address , eligibleLoanRequestStream.clientPhoneNo as phoneNo
      ,eligibleLoanRequestStream.fromAddress as emailAddress
      insert into inValidLoanRequestStream;
      

      Above, we have exported validLoanRequestStream and inValidLoanRequestStream from execution plan.

      In the above sample we have used event tables, patterns and filters that are further explained in [1], [2] & [3].

    • Creating eventFormatters to send events out from the CEP.
    • Here, we are going to create two eventFormatters for both validLoanRequestStream and inValidLoanRequestStream. These two formatters send replies to clients’ email addresses.

      EventFormatter to send mail for accepted loan requests.

      Mapping

      Hi {{clientName}}

      Your loan request is accepted for consideration.

      We'll get back to you with a positive reply as soon as possible after reviewing your documents.

      If you have any questions please contact Direct Loan Customer Services at the telephone number (27) 456789 from 8 am to 5 pm, Monday to Friday. We look forward to the completion of your loan.

      Thanks,

      Bank of Trust

      EventFormatter to send mail for rejected loan requests

      Siddhi Query

      Hi {{clientName}}

      Dear valued customer, we regret to inform you that your loan request has not been accepted for processing due to insufficient balance in your account.

      If you have any concerns or questions please contact Direct Loan Customer Services at telephone number (27) 456789 from 8 am to 5 pm, Monday to Friday. We look forward to the completion of your loan.

      Thanks,

      Bank of Trust

Conclusion

In this part of the article, we Now, we completed the first phase of the loan processing where the CEP handles loan request processing. We’ll move to the next phase of the use case where loan payment related configurations are handled in Part 2.

References

  1. https://docs.wso2.org/display/CEP310/Event+Table+Definitions
  2. https://docs.wso2.org/display/CEP310/Patterns
  3. https://docs.wso2.org/display/CEP310/Filters

 

About Author

  • Mohanadarshan Vivekanandalingam
  • Technical Lead
  • WSO2