sa
2014/09/09
9 Sep, 2014

Developing and Maintaining a Fleet Management System with WSO2 CEP

  • Yasassri Ratnayake
  • Associate Technical Lead - WSO2
Archived Content
This article is provided for historical perspective only, and may not reflect current conditions. Please refer to the WSO2 analytics page for more up-to-date product information and resources.

Introduction

Global Positioning System (GPS) is a technology that has come into use in public sense since the 1980s and has revolutionized the concept of tracking objects. GPS satellites circle the earth twice a day in a very precise orbit and transmits signal information to earth. GPS receivers take this information and use triangulation to calculate the user's exact location. Essentially, the GPS receiver compares the time a signal was transmitted by a satellite with the time it received the signal. The time difference tells the GPS receiver how far away the satellite is. With distance measurements from a few more satellites, the receiver can determine the user’s position. The WSO2 CEP can be used to access this GPS data and manage fleets of devices by processing it.


Fleet management with WSO2 Complex Event Processor (CEP)

Real time location finding using GPS is useful in many ways; the positioning information can be used to track objects, to manage and handle their routes, to find out a location on a map and so on. Some real world scenarios include tracking public transport vehicle locations and monitoring traffic on sea, air and ground.

Global positioning be used to manage and monitor a fleet of devices. The fleet can be a set of vehicles, airplanes or ships among other things. A fleet management system will allow organizations to manage and orchestrate the fleet in a very efficient and productive manner. Depending on the GPS fleet tracking system used, remote users can view device locations and routes or access available reports on the status of a vehicle.

The raw location information of a device may not be that useful without any further processing. Therefore an intermediate system can be used to process the GPS streams real-time and trigger actions or take intelligent decisions depending on the GPS streams. The WSO2 Complex Event Processor (CEP) comes into the picture in such a scenario.

In this article we will build a complete sample solution for a fleet management system to monitor public transportation. The final system would look as follows;

Figure 1


What is WSO2 CEP?

WSO2 CEP is a complex event processor that processes data in real-time. It is capable of handling millions of events per second. It can be used to process GPS data and hence can be used to process and alter GPS data streams and trigger actions on certain conditions by processing the data among other things.

CEPs take data in the form of one or more streams. Each stream has several name value parameters that can be processed. The CEP handles each input element in the stream as an event. You can then write queries in the form of a siddhi query in the CEP to process the data according to your requirement. When an event matches a user defined scenario the CEP will trigger an action.


The fleet management scenario

Monitoring the public transportation systems, especially buses, is a common requirement in many South Asian countries. For instance obeying speed limits, unnecessarily waiting at bus halts and racing with other vehicles are common issues connected with public transport services. Here we will be using the WSO2 CEP to process GPS streams that are transmitted by the devices and monitor them for any rule violations.

We have implemented a sample fleet management system to address the following scenarios. When each scenario is triggered the output console will indicate devices accordingly.

  • Speed Alert
  • Stationary Alert
  • Proximity Alert

The sample setup guide can be found here.

In this article we will look at the proximity alert scenario. The proximity alert scenario addresses a situation where two devices (GPS enabled vehicles) move into close proximity with each other for a pre-defined period of time. In defining the proximity the user can set a maximum allowable buffer for the devices. The user can also define the period of time for which the two devices can be in close proximity before triggering the alarm. An ideal situation in which this could be used is two check whether public transport vehicles are racing against each other for a considerable period of time.


Sample data set

For demonstration purposes a sample dataset was used. Several approaches were used to get the geolocations of routes. An android application called ‘GPS Logger’, a free project called OSRM and a custom application were incorporated in order to create an appropriate data set.

The data was prepared in the following format in a CSV file;

ID Timestamp Longitude Latitude Speed Angle Accuracy

To simulate a real world scenario we developed a client application to generate input data streams that can simulate streams coming from multiple devices.


Architectural Overview

Development Approach

To achieve the requirement, a custom extension needs to be written to Siddhi. To handle the proximity check scenario a free geospatial data analysis java library called geo tools is used. A custom extension is an extension for siddhi (the query language used inside WSO2 CEP). This custom extension should take in each and every event that comes to the CEP, check whether it is within the proximity range defined by the user and provide the output accordingly.

GPS devices generally store GPS information when it loses connection with the server and re-sends them as batches once the connection is reestablished. Therefore for the system to be accurate the batch data should be filtered out from the incoming data stream. One of the main requirements in processing GPS data is to differentiate between batch inputs and real time data inputs from each device. The batch inputs and real time inputs need to be put into two different streams and the real time stream needs to be processed further for the proximity condition.

In the development process many technologies were adopted for the solution. You can find the complete list of technologies used for implementation in Appendix A.


The flow of data

The data flow of the sample is illustrated in the following diagram;

Figure 2

The data stream is generated by the input stream generator. It takes in GPS data from many devices parallely and forms a single stream that is directed to the WSO2 CEP. Each element in the stream needs to have an id field so that the CEP knows which device they belong to.

In the first step, the CEP divides the stream into two different queues depending on their timestamp; the real time data queue, which would be used for further processing and the batch data queue which includes older data sent by the devices.

The proximity check is performed on the real time queue and the data stream generated by the CEP with a new field that includes the notification for front end display is then sent to ActiveMQ. The message consumer pulls the data in the queue and pushes it to the socket server which will then push them to the UI using websockets. The UI can be configured to show notifications on devices that satisfy the user input condition.


Input stream generator

The input stream generator can take any number of CSV files as inputs and it can simultaneously process these files and creates a concurrent data stream of many devices to replicate a real world scenario. The stream generator outputs a JSON stream and the standard time is converted into milliseconds with the generator. The code for the stream generator can be found here.


CEP flow

Figure 3

The stream received by the CEP undergoes the default CEP configuration flow as shown above. There is an http input adaptor in the CEP that receives the HTTP requests sent by the client.The input stream for the CEP is the GPS stream output by the client.

At the stream receiving end there is a JSON event builder where the input stream parameters are mapped to a JSON object defined in the CEP for further processing.

The CEP logic for processing the data and giving the output depending on the requirement is written in the execution plan which receives the data events from the event builder after they are processed. The output stream from the execution plan is formatted into a JSON object using a JMS output adaptor. The output stream is sent into an ActiveMQ JMS queue from which data is pulled by the message consumer and sent to the UI.


Message consumer

The message consumer continuously listens to the ActiveMQ queue and when ever a message is available it pulls the messages from the queue and pushes it to the websocket server. The code for the message consumer can be found here.


Socket server and UI

The socket server manages all the sessions of the websocket connections. Whenever a message is received by the server it broadcasts it to all active websockets. After the UI receives the message it will be processed. The device location and the relevant alerts will then be indicated on the console. The code for the socket server and the UI can be found here.


Implementation

Real time vs. batch analysis is a basic and a common requirement for all data before any other processing takes place. In GPS devices if the connectivity to the remote server is lost, it is common to hold such data and send them in batches once the connection is re-established. It is necessary to avoid using such data in the analysis process in the WSO2 CEP. Hence the initial requirement was to store such data in a separate queue for processing using the WSO2 Business Activity Monitor.


Real time vs batch analysis

This is performed using a siddhi query using event tables.

  1. Store the latest date_time received by all devices in the stream.
  2. When a new event comes in, if an entry does not exist for that particular device in the event table a new entry is created in the table. Otherwise the timestamp of the current packet is compared with the date_time value stored in the table for the particular device.
  3. If the current packet time is less, that implies that the packet is a previously stored packet and therefore dropped to the batch queue. Otherwise the packet is sent to the real time queue for further processing.
define table inMemoryTable (myid int, lastDateTime double);
from InputStream[not((id == inMemoryTable.myid ) in inMemoryTable)]
select id, time
insert into inMemoryTable;
from InputStream [(time > inMemoryTable.lastDateTime and id == inMemoryTable.myid) in inMemoryTable]
select id as myid, time as lastDateTime
update inMemoryTable
on myid == inMemoryTable.myid;
from InputStream [(time > inMemoryTable.lastDateTime and id == inMemoryTable.myid) in inMemoryTable]
select id,time,lat,longitude,speed, angle
insert into filteredstream;

In the execution plan if we receive any two events with the same timestamp and different device IDs, the two packets are sent to the “geo:GeoProximity” siddhi extension. This checks whether the two events received are within close proximity according to a user defined distance. If they are close then the function keeps track of how long they remains close. If the time exceeds a user defined time period, it sets a flag and sends the output to the UI.

Calling the function in siddhi: geo:geoproximity(proximitydistance,lat,longitude,id,time,timeperiod)


Siddhi extension

Figure 4

You need to read the CEP documentation on writing siddhi extension to get an understanding of the function format.

All the Geo space operations are performed using geo tools. For the geoproximity function the proximity distance needs to be given in meters and the time period in seconds. The other parameters are passed from the input event without any processing.

In the function we convert the GPS data into meters since GPS devices give the latitude and longitude values in degrees (1 meter = 110574.61087757687 degrees approx.).

  
Coordinate coord = new Coordinate(lattitude, longitude);
Point point = geometryFactory.createPoint(coord);
Geometry buffer = point.buffer(myRadius);
mybufferList[i] = buffer;

Geo tools within the function are used to check whether any two devices are in close proximity.

We maintain a Java hashmap of the device ID and the geometry buffer (a buffer of radius given by the user in the function) for the last received GPS location for each device;

  
GeometryFactory geometryFactory = JTSFactoryFinder.getGeometryFactory();
// draw the buffer for each point
Coordinate firstpoint = new Coordinate(pointOnelat, pointOnelong);
Point pointOne = geometryFactory.createPoint(firstpoint);
double bufferRadius = proximityDist / 110574.61087757687;// to convert  	                                                      		
// to degrees
Geometry buffer = pointOne.buffer(bufferRadius);
// if that id already exist update that entry
 GeometryList.put(id1, buffer); //store the buffer against the device id

Then for each arrived event, we iterate through the hashmap and check whether the device is within the buffer range of any other device;

  
if (!id2.equalsIgnoreCase(id1)) { 
// if the buffer is of another vehicle
if (pointOne.within(myBuffer)) {

From the above condition we check whether two different devices are within their proximity buffers; if so we store the timestamp of the first occurrence which made the proximity condition true. Every time it becomes true again we need to check the time difference with the very first instance and when it exceeds the user defined time period the trigger is output. We maintain a hashmap of the timestamp of the first instance when the proximity condition becomes true for two particular devices and delete the entry whenever the condition becomes false. The time period is checked only when the condition becomes true for two particular devices continuously.

  
if (!test.containsKey(id1+","+id2)) {// check for how long
   					              // they have been close
   		String myarray1 = pckttime ;
   		 test.put(id1+","+id2, myarray1);
   		 test.put(id2+","+id1, myarray1);
   		}
   		 double timecheck = Double.parseDouble(test.get(id1+","+id2));
   		timediff = time - timecheck;
   		if (timediff >= giventime){ 
               // if the time difference for being in close proximity is less than the user input time period, output
// true else false   	
 IDList.add(id2); // the list of close device IDs
   		}
   		} else {
   		 if (test.containsKey(id1+","+id2)) {
   			test.remove(id1+","+id2);
   			test.remove(id2+","+id1);
   		 }

The output is given in string format as follows;

condition (true OR false) + the list of device IDs for which it became true

You can download the code of the siddhi extension from here and build the jar (Java Archive) file. After building the jar file copy it into the CEP_HOME/repositoy/components/lib to be accessible in the format mentioned in a siddhi query.

The proximity alert will be indicated on the UI as follows;

Figure 5


Conclusion

This article shows you how to use the WSO2 Complex Event Processor to process GPS data and trigger a proximity alarm when two or more devices move in close proximity. The WSO2 CEP can be used to handle GPS streams and process GPS data making it ideal for fleet management through siddhi extensions. The highly customizable architecture of the WSO2 CEP allows you to add custom extensions to and enhance and add new functionalities easily.

The scenario explained in this article shows you how to write a siddhi extension using geo tools and integrate it with the CEP. You will also learn how to write a siddhi query to call and extension in order to generate the output according to the requirement.

This sample could be used to analyse scenarios where we get streams of GPS data from fleets of moving devices.


Appendix A

Technologies and tools used

GeoTools is a free (LGPL) GIS toolkit for developing standards compliant solutions. It provides an implementation of Open Geospatial Consortium (OGC) specifications as they are developed. GeoTools is a contributor to the GeoAPI project - a vendor-neutral set of Java interfaces derived from OGC specifications - and implements a subset of those. It is written in Java and is currently under active development. This has functions that enables geo space analysis of GPS coordinates. GeoTools is being used as the base of the custom extension for siddhi.


Leaflet is a modern open-source JavaScript library for mobile-friendly interactive maps. It is developed to handle most of the map related functionalities. It is a feature-rich and user-friendly library that is freely available. Leaflet was used to develop the map console which outputs the device locations.


Jaggery is a framework that can be used to write web apps and HTTP-focused web services for all aspects of an application; front-end, communication, server-side logic and persistence in pure Javascript. Jaggery is open-source and was released under Apache 2.0. Jaggery was used to develop the UI and the websocket server.


Apache ActiveMQ is an open source message broker written in Java together with a full Java Message Service (JMS) client. ActiveMQ was used for event queuing.


HTML5 websockets is a new technology that allows effective two way communication with the client and the server. It allows the server to push data to the UI and update the UI in real time. Websockets were used to transmit data to the front end UI.

 

About Author

  • Yasassri Ratnayake
  • Associate Technical Lead
  • WSO2