All posts by Iranga Muhuthanthri

Perfecting the Coffee Shop Experience With Real-time Data Analysis

Picture a coffee shop.

The person who runs this shop (let’s call her Sam) operates an online coffee ordering service. Sam intends to differentiate her value offering by providing a more personalized customer experience.

Offering customers their favorite coffee as they walk into the store, rewarding loyal customers with a free drink on special occasions – these are some of the things on her mind.

Further the value creation is not limited to her customers but extends to business operations such as real-time monitoring and management of inventory. Sam wants:

  • A reward system where points will be calculated based on order value. Once a reward tier point value is reached, the customer will be notified in real-time about an entitlement for a free drink
  • Inventory levels are updated in real-time on order placement. An automated notification is sent to suppliers in real-time as predicted re-ordering levels are reached

Overview of the solution


Understanding the customer is the first action in  providing a personalized experience. To do this, one must collect intelligence. In today’s digital business, customers pass through many ‘touchpoints’, leaving a digital trail. For example, many would search ‘health benefits of coffee’, some would publish a review on their favourite coffee type – and so on.           

Application Program Interfaces (or APIs) come into play here. In a business context, APIs are a way that businesses could expose their services externally, so that consumers, using an app or some technological interface, can subscribe to and access these services.

For example, Sam can have an “Order API”  that provides a way for consumers to order coffee from her shop using their mobile app.

What we now need is a simple way to create and publish said API and a central place for consumers to find and subscribe for this API. We also need proper security and an access control mechanism.

Data leaving through the API needs to be collected, stored and analyzed to identify patterns. For example, Sam would like to know what the most used combination of ‘coffee and flavors’ is, at which point of the day, by which type of users – which would be helpful for targeted promotion campaigns. For this, we need to understand the data that comes through. 

In base terms, the system requirements for developing such a solution are to: 

  • Design an API for end user access
  • Publish end user attributes (API data) for analytics
  • Process API data in real-time
  • Communicate outcomes

The solution requires to integrating API Management with real time event processing ,where the API end user attributes can be published to a steaming analytic engine for real time processing. There are many offering in the market that provides separate offering, however integrating these offering has it’s own challenges.

WSO2 offers a completely integrated 100% open source  enterprise platform that enables this kind of use case – on-premise, in the cloud, and on mobile devices.

We offer both an API management and streaming analytics product, architected around the same underlying platform, which enables seamless integration between these offerings.

WSO2 API Manager is a fully open source solution for managing all aspects of APIs including creating, publishing, and exposing APIs to users in a secure and scalable manner. It is a production-ready API management solution that has the capability of managing all stages of the API lifecycle in a massively scalable production environment.

WSO2 CEP is one of the fastest open source solutions available today, find events patterns in real-time milliseconds. It utilizes a high-performance streaming processing engine which facilitates real time event detection, correlation and notification of alerts, combined with rich visualization tools to help build monitoring dashboards.

WSO2 MSF4J is a lightweight framework that offers a fast and easy programming model and an end-to-end microservices architecture to ensure agile delivery and flexible deployment of complex, service-oriented applications.

Building an API for end user access

Let’s examine how we can build this with what we’ve listed above.

WSO2 API Manager includes architectural components, the API Gateway, API Publisher and API Store (Developer Portal), Key Manager, Traffic Manager and API Analytics. The API Publisher provides the primary capability to create and publish an API. The developer portal provides a way for subscribers to access the API.

API data to the streaming analytics engine is published through a default message flow. The solution we have in mind requires changing this default flow to capture and publish custom user data.

This is implemented  as a custom ‘data publishing mediator’ (see mediation extensions).  

In a nutshell, message mediation for simplification can be described as the inflow processing of messages, which could be modified,transformed, routed and many other ‘logics’.  Mediators are the implemented component of the logic, which when linked together creates a sequence or flow of the messages.  With API Manager tooling support, a custom flow is designed using a class mediator to decode, capture and publish end user attributes.

The custom sequence extracts the decoded end user attributes passed via JWT headers. The class mediator acts as a data agent that publishes API data to WSO2 CEP. The parameters passed to the class mediator include the connection details to CEP and the published event stream.

Real-time processing of API Data

To capture API data for real-time processing, the same stream definition and event receiver  is created and mapped to the stream. WSO2 provides a comprehensive set of extensionspredictive analytics capabilities  are added via the WSO2 ML extension.

Coffee reordering

The mechanics of reordering coffee based on a real-time analysis goes thus:

An event table represents the inventory details (‘drink name’ ‘ordered quantity’ , available quantity). The API data stream is joined with the event table and the available quantity in stock is reduced using the order quantity as and when events are received. When the reorder quantity level is reached, a email notification is published.

Real-time rewards

Similar to the approach above, the API data is joined with an event table, the event table represents the end user and the reward points generated per order. The reward points are equated to the order size and reward points are added with each new order placed. A reward limit threshold is defined, and when the limit is reached for a new order a notification is sent to the end user, offering a free drink.

Communicating outcomes

To communicate the  outcome of the real time processing event processing, WSO2 CEP provides capability to generate alerts via an SMS, email, user interface  etc. through event publishers. Email notification can be generated to alert management when re-order level are reached, as well as send an SMS to the client to notify offer for a free drink.

Meanwhile, the backend service for order processing is developed as a Java Microservice using WSO2 MS4FJ, which processes the order and can respond with the order id and cost.

Why Open Source?

As a small business, Sam’s resources are limited. Her best strategy for implementing the solution is open source, which offers lower startup costs and effort compared to the high licensing fee and complications involved with the commercial vendors.

Being open source also allows Sam to download, learn and evaluate the product without a high investment, thus minimizing her business risks. Depending on the results of her evaluations, he could go forward or ‘throw away’.

To grow in a competitive business  environment requires companies to differentiate. For small scale business  it becomes more of a challenge to implement such solution due to resource limitations. The seamless integrated capability provided by the open-source WSO2 Platform provides business a low risk and cost effective technology to build and deliver real-time business value to their clients.

The code for this use case

Listed below are what you need to recreate this discussion as a demo:

 

Pre-Requisites

Down the following products and set the port offsets to run the servers on the same server. WSO2 APIM runs on the default offset (0) while the WSO2 CEP offset is 4.

Products
WSO2 API Manager 2.0.0
WSO2 Complex Event Processor 4.2.0
WSO2 MSF4J (WSO2 MicroServices Framework for Java)
WSO2 App Cloud

For simplification purposes the inventory details are stored as tables of a MySQL database.

Execute MySQL database script db_script.mysql to create ‘Inventory’ Database and ‘Rewards’ and ‘orders’ table.

WSO2 MSF4J

  1. Execute the Kopi-service’ java microservice
    1. <WSO2_MSFJ_HOME>/kopi-service/target / Java -jar kopi-service-0.1.jar

Alternatively the java microservice can be deployed in the WSO2 App Cloud.

WSO2 CEP Setup

  1. Setup email configuration for the output event publisher
  2. Copy the JDBC driver JAR file for your database to <CEP_HOME>/repository/components/lib.
  3. Startup the server
  4. Configure a data source as “CEP-DS”.  Select Mysql as the RDBMS and set the database as ‘Inventory’ created.
  5. The created datasource is referenced when defining ‘Event Tables’ when creating the Siddhi queries.
  6. Deploy “Streaming-CApp” CApp . The correct deployment should visualize an event flow as depicted.

WSO2 API Manager Setup

  1. Configure WSO2 API Manager to pass end user attributes as JWT Token.
  2. Copy the custom data publisher implementation (org.wso2.api.publish-1.0-SNAPSHOT.jar ) library to $API_MGR_HOME /repository/components/lib
  3. Startup the Server.
  4. Login to the API Publisher:
  5. Create and publish an  API with the following details
    1. Context -<context>
    2. Version – <version>
    3. API Definition
      1. GET – /order/{orderId}
      2. POST -/order
    4. Set HTTP Endpoint: http://<server-ip:port>/WSO2KopiOutletPlatform/services/w_s_o2_kopi_outlet_service
    5. Change the default API call request flow by enabling message mediation and uploading file  datapublisher.xml as the ‘In Custom Sequence’.
  6. Login to the API Store and subscribe to the created API
  7. Invoke API with an order exceeding available quantity { “Order”:{ “drinkName”:”Doppio”, “additions”:”cream”, “orderQuantity”:2000 } }

 

Predicting re-order levels

The re-order quantity is initially calculated based on a ‘re-order factor(ROF) and order quantity formula (ROF * order quantity). Siddhi provides a machine learning extension for predictive analytics. The reorder quantity can be predicted using machine learning model.  

The re-order data points calculated previously (with the formula) can be used as data sets to generate a machine learning model with WSO2 Machine Learner. A predicted re-order quantity is calculated based on the “Linear Regression” algorithm, with the “Reorder factor (ROF) and coffee type  as the features.

The siddhi query for predicting reorder quantity is commented under ‘Predict reorder quantity using Machine Learning extensions. It can be executed by replacing the query under ‘Calculating reorder quantity’.

Appendix: code

Custom Sequence

<?xml version=”1.0″ encoding=”UTF-8″?>

<sequence name=”publish-endUser” trace=”disable” xmlns=”http://ws.apache.org/ns/synapse”>

 <log level=”full”/>

 <property expression=”get-property(‘$axis2:HTTP_METHOD’)” name=”VERB”

   scope=”default” type=”STRING” xmlns:ns=”http://org.apache.synapse/xsd”/>

 <property expression=”get-property(‘transport’,’X-JWT-Assertion’)”

   name=”authheader” scope=”default” type=”STRING” xmlns:ns=”http://org.apache.synapse/xsd”/>

 <log level=”custom”>

   <property expression=”base64Decode(get-property(‘authheader’))”

     name=”LOG_AUTHHEADER” xmlns:ns=”http://org.apache.synapse/xsd”/>

 </log>

 <property expression=”base64Decode(get-property(‘authheader’))”

   name=”decode_auth” scope=”default” type=”STRING” xmlns:ns=”http://org.apache.synapse/xsd”/>

 <script description=”” language=”js”><![CDATA[var jsonStr= mc.getProperty(‘decode_auth’);

var val= new Array();

val=jsonStr.split(“}”);

var decoded= new Array();

decoded= val[1].split(“enduser\”\:”);

var temp_str= new Array();

temp_str=decoded[1].split(‘\”‘);

mc.setProperty(“end_user”,temp_str[1]);]]></script>

 <property expression=”get-property(‘end_user’)” name=”endUser”

   scope=”default” type=”STRING”/>

 <log level=”custom”>

   <property expression=”get-property(‘endUser’)” name=”Log_Enduser”/>

 </log>

 <class name=”org.wso2.api.publish.PublishMediate”>

   <property name=”dasPort” value=”7619″/>

   <property name=”dasUsername” value=”admin”/>

   <property name=”dasPassword” value=”admin”/>

   <property name=”dasHost” value=”localhost”/>

   <property name=”streamName” value=”Data_Stream:1.0.0″/>

 </class>

</sequence>

Siddhi Query

/* Enter a unique ExecutionPlan */

@Plan:name(‘Predict’)

/* Enter a unique description for ExecutionPlan */

— @Plan:description(‘ExecutionPlan’)

/* define streams/tables and write queries here … */

@Import(‘API_Stream:1.0.0’)

define stream APIStream (drinkName string, additions string, orderQuantity double, endUser string);

@Export(‘allOrder_Stream:1.0.0’)

define stream allOrderstream (drinkName string, qtyAvl double, qtyPredict double);

@Export(‘predictStream:1.0.0’)

define stream predictStream (drinkName string, qtyPredict double);

@Export(‘Order_Stream:1.0.0’)

define stream orderStream (drinkName string, orderQty double, qtyAvl double, qtyOrder double, ROF double); 

@Export(‘reOrder_Stream:1.0.0’)

define stream reOrderStream (drinkName string, qtyAvl double, qtyPredict double);

@Export(‘outOrder_Stream:1.0.0’)

define stream outOrderStream (drinkName string, qtyOrder double, qtyReorder double, ROF double);

@Export(‘ULPointStream:1.0.0’)

define stream ULPointStream (subScriber string, points double);

@Export(‘totPointStream:1.0.0’)

define stream totPointStream (subScriber string, totPoints double);

@Export(‘FreeOrderStream:1.0.0’)

define stream FreeOrderStream (subScriber string, points double); 

@from(eventtable=’rdbms’, datasource.name=’CEP-DS’, table.name=’orders’)

define table drinkEventTable(drinkName string, qtyAvl double, qtyOrder double, ROF double);

@from(eventtable=’rdbms’, datasource.name=’CEP-DS’, table.name=’rewards’)

define table pointEventTable(subscriber string, points double);

from APIStream#window.length(0)as t join drinkEventTable as d

on t.drinkName==d.drinkName

select t.drinkName as drinkName, t.orderQuantity as orderQty, d.qtyAvl as qtyAvl,d.qtyOrder as qtyOrder, d.ROF as ROF

insert into orderStream; 

/* ——Drink Reordering————- */

/* —–Calculating reorder quantity———– */ 

from orderStream#window.length(0) as p join drinkEventTable as o

on o.drinkName==p.drinkName

select o.drinkName,o.qtyAvl,(p.orderQty* p.ROF) as qtyPredict

insert into allOrderstream;

/*———————Predict reorder quantity using Machine Learning extentions—————*/

/*

from orderStream

select drinkName,ROF

insert into ROF_Incoming;

from ROF_Incoming#ml:predict(‘registry://_system/governance/ml/Reorder.Model’,’double’,drinkName,ROF)

select drinkName, qtyReorder as qtyPredict

insert into predictStream;

from predictStream#window.length(0) as p join drinkEventTable as o

on o.drinkName==p.drinkName

select o.drinkName,o.qtyAvl, p.qtyPredict

insert into allOrderstream;

*/

/*——————————————————–*/

partition with (drinkName of allOrderstream)

begin @intro(‘query3’)

from allOrderstream[qtyPredict>=qtyAvl]

select drinkName,qtyAvl,qtyPredict

insert into #tempStream2;

from e2=#tempStream2

select e2.drinkName, e2.qtyAvl,e2.qtyPredict

insert into reOrderStream

end;

from orderStream[(qtyAvl-orderQty)>=0]#window.length(0)as t join drinkEventTable as d

on t.drinkName==d.drinkName

select t.drinkName as drinkName,(d.qtyAvl – t.orderQty) as qtyAvl

update drinkEventTable

on drinkName==drinkEventTable.drinkName; 

/*——————————————– */

 /*—– Offer free drink ——-*/

from APIStream

select endUser as subScriber ,orderQuantity as points

insert into ULPointStream;

 from ULPointStream as u join pointEventTable as p

on u.subScriber == p.subscriber

select u.subScriber as subscriber ,(u.points+p.points) as points

update pointEventTable

on subscriber==pointEventTable.subscriber;

from ULPointStream[not(pointEventTable.subscriber==subScriber in pointEventTable)]

select subScriber as subscriber,points

insert into pointEventTable;

from ULPointStream as u join pointEventTable as p

on u.subScriber == p.subscriber

select u.subScriber as subScriber,p.points as totPoints

insert into totPointStream;

 partition with (subScriber of totPointStream)

begin @info(name = ‘query4’)

from totPointStream[totPoints>=100]

select *

insert into #tempStream;

 from e1= #tempStream

select subScriber, totPoints as points

insert into FreeOrderStream

end ;

/*————————————*/