We have entered an era where competitive advantage comes from analyzing, understanding, and responding to an organization’s data. When doing this, time is of the essence, and speed will decide the winners and losers.
The reality is that the value of most data degrades with time. It’s interesting to know that yesterday there was a traffic jam, or 10 fraud incidents, or 10 people who had heart attacks. From that knowledge, we can learn how to mitigate or prevent those incidents in the future. However, it is much better if we can gain those insights at the time they are occurring so that we can intervene and manage the situation.
Many of us have experienced how timely data can help our own lives. Let's say that you are driving along the road, and suddenly, the check engine light comes up. Sure, it’s annoying, but it’s also extremely useful to get an early warning—typically triggered when the vehicle deviates from normal behavior or hits a well-known failure pattern, such as when the charging path for the battery is not functioning. With luck, you can pull over to the nearest garage, get it fixed, and be on your way instead of becoming stranded on the road.
Automobile manufacturers have thought through the vehicle scenario described here. Similarly, some organizations are examining how certain use cases, such as stock markets, patient monitoring, and surveillance, can benefit significantly by having instant access to data in order to take timely, corrective actions. Still, there are many other use cases that have not yet been examined.
Over the years, we have tried to use databases and batch processing technologies, such as Hadoop and Spark, to analyze data and respond quickly enough to improve the outcome. Both of those technologies are well-suited to a number of applications, but they are not the best tools of choice for quickly responding to data. Instead, the technology designed to handle use cases requiring immediate insights is known as streaming processing or streaming analytics.
On the other hand, streaming, which is designed to do one pass through the data, is not a tool for all situations. Streaming is valuable for use cases where processing can be done with a single pass over the data or has temporal locality (where processing tends to access recent data). By contrast, training machine-learning algorithms and graph processing both require multiple passes through the dataset and therefore are a poor fit for stream processing.
This blog introduces technologies we can use for stream processing. It begins by explaining the programming model provided by the first wave of stream processing tools, such as Apache Storm, and their limitations. It then introduces streaming SQL and discusses key operators in streaming SQL while comparing and contrasting them with SQL. Next, we will explore how to incorporate machine learning into streaming SQL. It then concludes by looking at how streaming SQL fits into the analytics pipeline of an organization and its deployment challenges.
Stream processing takes in events from a stream, analyzes them, and creates new events in new streams. So, stream processing first needs an event source. It can be a sensor that pushes events to us or some code that periodically pulls the events from a source. One useful tool at this point is a message queue, which you can think of as a bucket that holds the events coming in your way until you are ready to process them. We will come to this later.
Now you need to do something with the data. The old way of doing this was to write code that hooked up your processing with the event source and then process each event one by one. One useful unit in an event processing architecture is an “actor” or “agent” that accepts an event and produces another event. Such an actor conceptually converts one stream to another stream.
To understand these ideas, let's consider a boiler that has a sensor for measuring its temperature. For example, let’s assume that we want to detect when the temperature stream from the boiler is greater than 350°C. This new stream can be used by another actor to create yet another stream. Actually, actors can be a bit more complex, accepting multiple streams in order to produce a composite stream. When you wire up multiple streams with actors, you end up with a processing graph. Events then flow through the graph and produce new events.
Let’s assume we are calculating “30-second running-average” on a temperature stream inside the boiler and outside the boiler, and we want to produce an event when the two temperatures are too far apart. The graph to calculate this query would look as follows.
Here the first two actors will need to remember data about earlier events (e.g. sum and count). If the system fails, it will lose that data. Therefore, we need to build a way to save and recover those states.
Instead of coding the above scenario from scratch, you can save time by using an event stream processor. An event stream processor lets you write logic for each actor, wire the actors up, and hook up the edges to the data source(s). An event stream processor will do the hard work of collecting data, delivering it to each actor, making sure they run in the right order, collecting results, scaling if the load is high, and handling failures. That's the good news—and the bad news is that there are many stream processors to choose from. Examples are Apache Storm, Apache Flink, and Kafka Streams to name three. I do not intend to help you select which one to use in this post. However, if you are starting, any of them will do.
Let’s assume that you picked a stream processor, implemented some use cases, and it's working. Now you sit down to savor the win. However, given that you can simply write SQL or something like SQL when doing batch processing, why should you have to write all this code? Shouldn’t you be able to do streaming with SQL?
The answer is yes, you should. Such streaming SQL exists. Again there are many offerings. Some open source solutions include WSO2 Stream Processor, Storm, Flink, Kafka, all of which provide some support for SQL.
Unfortunately, unlike SQL, there is no standard streaming SQL syntax. There are many favors, which follow SQL but have variations. Let’s look at examples using two languages: Siddhi Streaming SQL and Kafka KSQL.
Our boiler temperature use case written in streaming SQL would look as follows.
SQL is a powerful language for querying structured data. It is designed as a set of independent operators: projection, filter, joins, and grouping, which can be recombined to create very powerful queries. Among them, the first two operators work pretty much the same with streaming SQL as well.
The first operator is projection (a.k.a select), which lets you select a subset of properties from a stream and reshape a stream. For example, “Select bid” selects the boiler ID field and “avg(T) as T” reshapes the field T.
The second operator is a filter, or a “where clause” in SQL speak, while the language we used (Siddhi), uses a more compact notation for the where clause with “BoilerStream[t > 350]”.
Together the query asks the system to “select events from BoilerStream whose property “t” is greater than 350, and from each event extracts bid and calculates tF and put them into a new stream”.
As with SQL, streaming SQL lets us manipulate streaming data declaratively without having to write code. Following are some advantages of streaming SQL languages:
- It's easy to follow and learn for the many people who know SQL.
- It's expressive, short, sweet and fast!!
- It defines core operations that cover 90% of problems.
- Streaming SQL language experts can dig in when they like by writing extensions!
- A query engine can better optimize the executions with a streaming SQL model. Most optimizations are already studied under SQL, and there is much we can simply borrow from database optimizations.
Let us walk through a few of the key operators. Just as SQL can cover most data queries on data stored in a disk, streaming SQL can cover most of the queries on streaming data. Without streaming SQL, programmers would have to hand code each operator, which is very complicated and hard work.
Concepts in SQL, such as “group by” and “having” clauses, usually work similarly with streaming SQL languages. Hence we will not discuss them.
Streaming SQL has two additional concepts not covered by SQL: windows and joins, which handle the complexities of streaming. Let’s understand each of them.
Streaming SQL: Windows
In the use cases we considered so far, such as filters and projection, our actors only looked at a single event at a time. However, most real-world use cases, such as the boiler temperature we considered, need some form of working memory. Streaming handles this by letting users define a working memory as a window.
You can think of windows as a view on top of the stream. Events in the window are kept in memory and are available for further processing with operators.
The most simple available window is a sliding length window. It keeps last N events in the window, and it is often used to calculate moving averages. The following picture shows a sliding window.
Textually, a window query looks as follows. It mostly looks like SQL, except for the window statement, which specifies the window size, and “avg(T)”, which tells how to aggregate the data into one field.
|Select bid, avg(T) as T From BoilerStream#window.length(10) insert into BoilerStreamMA
||Select bid, avg(T) as T From BoilerStream WINDOW HOPPING (SIZE 10, ADVANCE BY 1)
You may have noticed that when I mentioned the window name, I said sliding-length window. Here sliding means that the window will trigger an output at every event. The other alternative is batch, where the window will only trigger an output at the end of the batch. The following figure shows the difference between batch and sliding windows. Arrows drawn from the ends of windows signal outputs.
Working memory might be defined in terms of numbers of events (length windows) or in terms of time elapsed (time windows). This creates four combinations:
- Sliding length window – keeps last N events and triggers for each new event.
- Batch length window – keeps last N events and triggers once for every N event.
- Sliding time window – keeps events triggered at last N time steps and triggers for each new event.
- Batch time window – keeps events triggered at last N time steps and triggers once for the time period at the end.
The above four are the most common scenarios. However, other windows are possible. For example, the first – the first event in the stream, the last – the last event in the stream, session – the events in the same session, etc.
Windows give you the power to keep a working memory and look back at recent data efficiently. One common use case of this is moving averages, which smoothens the data and tracks long-term trends. Following are some of the other use cases:
- Moving average or median removes noise.
- Multiple moving averages (e.g. 1 minute, 5 minutes, and 15 minutes) act as indicators of the direction of the trend and often are used with stock trading.
- Stock market technical analysis uses crossover points of moving averages to detect trend reversals.
- Processing on top of recent events windows can be used to detect anomalies.
- Regression on a recent window can be used to predict the next value prediction and trend.
Streaming SQL: Joins
If we want to handle data from multiple tables, we use the JOIN operator in SQL. Similarly, if you want to handle data from multiple streams, there are two options. First is to join the two and create one stream while the second is to write patterns across multiple streams.
Let’s explore joins. When we use joins, it is semantically similar to inner joins in database terminology.
The first challenge of joining data from multiple streams is that they need to be aligned as they come because they will have different time stamps. The second challenge is that, since streams are never-ending, the joins must be limited; otherwise the join will never end. The third challenge is that the join needs to produce results continuously as there is no end to the data.
As shown in the following picture, we handle this by keeping a window and collecting events from the purple stream (which is passive in the join) and then joining new events from the blue stream against the events in the window.
To understand how this works, let's consider the Figure 6 below. Stream S1 has attributes “x” and “id” while stream S2 has attributes “y” and “id”. We are joining the two streams based on ID properties.
The join query looks as follows with Siddhi Streaming SQL.
Select x, y From S1 as s1 join S2 as s2 on s1.id=s2.id insert into JoinedStream
Let’ explore in detail how it works.
- As shown in the picture, when e1 happens, it is collected in the window as it occurs in stream S2.
- When e2 happens, the event is joined with e1 and produces a new event.
- Event e3, which occurs in the active stream, is dropped as it does not match the ID with any events in the window.
- Event e4 is retained in the window because it is in stream S2.
- Event e5 matches with events in the window and generates two events as output.
- Event e6 is retained in the window, and the oldest event e1 is removed as the window is of size 2.
- Event e7 matches with events in the window and produces two events as output.
For the above model to work, at least one stream has to be bounded or in other words must have a window attached. This leads to two cases as shown below.
The first scenario is what we saw earlier, where one stream (S2) is bounded by a window, and events retained in the window are matched against events coming in the second stream (S1). Only events in the second stream (S2) will trigger an output.
On the other hand, a two-window query (e.g. described below) will retain events coming from both the streams in the window, and a new event coming in will either trigger a match and an output.
Streaming SQL: Applying patterns
The operators we have discussed so far naturally extend from SQL. Although the windows are a new addition, they are a natural adaptation of the idea of limited working memory to the never-ending stream. The next idea, patterns, goes beyond SQL and opens up a new set of use cases that are hard to do with the SQL model, yet natural and widely usable in the streaming world.
As we discussed already, streaming extends the SQL model (a.k.a. relational model) by adding time into the event and accepting that each data point occurs at a specific point of time. From that point, it is natural to think about the order of events.
Considering event ordering, the most natural problem is asking, “Is Event A followed by Event B?”. For example, let’s assume that we want to detect if the temperature changed more than 20% within five minutes. To detect this, you will need to track the temperatures for the last five minutes and match them against each new event.
The “happens after” relationship is a key component in a wide variety of use cases. Following are some of the use cases:
- Detect the rise or fall of a value.
- Detect a peak or a trough.
- Detect chart patterns (i.e. for stock markets).
- Identify a behavior signature (e.g. walking, running).
- Check for the completeness of a task. (Does the reply to an email come within an hour?)
Although simple and widely used, it is very complicated to detect a “happens after” pattern with SQL. The only way to do that is to attempt to join a table (or stream against itself), which is very expensive and complicated if you try to detect multiple “happens after” relationships.
You might ask, why it wasn't a concern with basic SQL? There are two main reasons. First, time was not an integral part of the SQL model, hence patterns was not often considered. Second, it was very hard to implement with the SQL model, hence no one wanted to do it.
Streaming SQL includes an operator called “pattern” that can handle this and similar use cases.
The following query shows the operator in action.
Select bid, ts as T From BoilerStream as b1 -> BoilerStream[(T - b1.T)/b1.T > 0.2 ] insert into BoilerIncreasedStream
Here “->” symbol represents the "happened after" relationship. The following figure shows how the operator will work.
It will track each event pair to see whether “event A is followed by C” happens and trigger and output when it happens. For example, in the above scenario, it triggers “AABC”, “ABC”, and “AC” as those are the cases where “event A is followed by C” occurs.
Hence by adding patterns, streaming SQL languages go beyond conventional SQL and enable a new set of use cases as discussed above.
Patterns are actually much more powerful than just the “happens after" relationship.
Case 1: A single pattern query can include multiple “happens after relationship” instances.
Case 2: Also each element in the query can be annotated with “optional” and any number of annotations such as regular expressions. For example:
A->B?->C means A optionally followed by B and then C
A->B*->C means A followed by any number of Bs and then C
The above operators make this very powerful and let us describe very complex patterns using a simple syntax.
Good examples of streaming SQL in action are available with the use cases described in the 2013 Distributed Event-Based Systems (DEBS) Grand Challenge. The challenge included a football game where the ball, boots of each player, and gloves of the goalie had sensors that emit an event that includes a timestamp, x,y,z locations, velocities (vx, vy, vz) and accelerations (ax, ay, az).
The use cases described how streaming SQL detected the ball possession by each player, which can be a key measure of their effectiveness.
The first query detected when the ball was kicked. A kick was detected when a player and the ball were within one meter of each other and the ball's speed increased to more than 55m/s-2. This was done by joining the player streams and ball streams and then detecting the conditions (see the DEBS Grand Challenge reference noted above for detailed queries).
Then the possession for player 1 was detected by looking for a kick by another player, followed by one or more kicks by player 1, followed by a kick by another player. Although a complicated condition, the pattern operator of streaming SQL could detect the two conditions in just two queries.
Just like regular expressions, streaming SQL patterns can be implemented with nondeterministic finite automata in streaming fashion and can be performant. With above use cases, we observed performance in excess of hundred thousand events per second.
With a streaming SQL pattern operator, the streaming SQL goes beyond standard SQL and enables a wide class of powerful, effective real-world use cases, such as Internet of Things (IoT) scenarios.
How to do machine learning
The operators we discussed so far let you handle most use cases and detect complex patterns. However, one common requirement that cannot be ignored in an analytics pipeline is machine learning. Let’s discuss how to incorporate machine learning into the streaming.
Machine learning has several steps. First, we run a learning algorithm to learn from a data set and create a model that can be used to predict the outcome for new inputs. The simplest way to incorporate machine learning into a streaming pipeline is to build a model using batch processing, export the model, and use the model within the streaming pipeline.
Using the model from the streaming pipeline can be done in one of two ways.
- We can write or use an extension that can run a machine-learning model that is created from a specific machine-learning platform. For example, there will be an H2O extension to run H2O models, a Spark extension to run Spark models, etc.
- Predictive Model Markup Language (PMML) lets us export machine-learning models as an XML document that is language neutral. A machine-learning framework, such as R, Python scikit-learn, or Apache Spark, can export some of its models as PMML. Then a PMML extension built into the streaming engine can apply the model against the data streams.
One weakness of the machine-learning model approach is that the model will be outdated with time and needs to be replaced with newer models. This is called concept drift in machine learning. Handling concept drift requires users to swap the models in production from time to time. Doing this is complex and complicates the deployment of the system.
There is a separate class of machine-learning algorithms called streaming machine learning, which can learn from data in a streaming manner and update the model as events come in. An example of this technology is the Apache SAMOA project. This frees users from the need to update the model. However, this is an active research area, and the current state of the art is significantly less accurate than batch versions. Hence this method has a very limited use as of now.
Hence, we need to use the model method to use machine learning within the pipeline. Several streaming SQL engines (e.g. Siddhi) supports this out of the box.
Fitting stream processing into the analytics pipeline
Finally, let’s explore how streaming SQL would fit into a system architecture.
The most simple architecture would look as follows.
A user would deploy a query into a stream processor, which listens to events and produces a new event stream. The second event stream can be sent to another query, saved to a disk, or attached to code that carries out an action.
If we are planning to run a real system, where failure of the system can have grave consequences, we need to worry about high availability and reliability. Unlike services we encounter in service-oriented or resource-oriented architectures, stream processors are stateful. Hence, we need to have a duplicate server to recover the lost state and ensure high availability (HA).
There are two solutions to this problem:
- Hot-warm high availability
- Queue-based recovery.
With hot-warm high availability, stream processors run side by side, and events are sent to both servers, but the outputs are dropped from one server. If the master server fails, then the second server can take over from that point onwards. Since the second server builds up the state anyway, no state is lost.
The following figure shows the setup.
The hot-warm high availability (HA) approach, however, needs one more step. If a node has failed, and the secondary has taken over, it will continue to work. But the system now runs with one node. Therefore, we have to add a replacement node to the system. To do that, we need to be able to start a new node and synchronize the state in the secondary to the new node, so the new node can be the new secondary.
With queue-based recovery, as shown in the following figure, the idea is to first place the events in a message queue. The stream processor then pulls the events from the queue, processes them, and produces new events.
Here HA is provided by the message queue where, even if the stream processor has failed, the message queue can continue to accept messages. However, when a failure happens, the state (e.g. events in the windows, state machine for patterns, etc.) will be lost. To handle this problem, the stream processors should periodically provide checkpoints that read the location (cursor) in the queue, the state, and events that it has sent out. So when a stream processor has failed, it can go back and start processing from the last cursor location in the queue and pick up the corresponding state from the checkpoints and continue.
The decision whether to use hot-warm HA or queue-based recovery will depend on the use case. The advantages of hot-warm HA are that you can get an HA system with just two nodes, and it is simple to set up. A minimal HA setup can typically handle tens of thousands of events per second. About 80% to 90% of all use cases can be handled in this manner. However, this method scales poorly because if the work is being done with N nodes, the system needs 2N nodes to scale.
By contrast, queue-based recovery is complicated to set up, and five nodes are needed for the minimum HA setup. You need a message queue (JMS broker or Kafka), database, and a processing node. For the message queue and database, you will need to set up HA versions. If you choose to use a JMS broker, you can get about 5,000 messages per second with two nodes. If you choose Kafka, a minimal deployment needs about five nodes (four zookeeper nodes plus a Kafka node), and you can do hundreds of thousands of events per second. This method scales better. If the work can be done with N nodes, the system needs N + 4 nodes only.
In summary, if you have a small or medium workload, the hot-warm HA would be better as it provides a simple deployment with two nodes. However, in scenarios that have higher event rates, in the excess of average throughout of 20,000 events per second, queue-based recovery provides a better approach.
Many use cases need fast, real-time decisions. Although it is possible to implement them using databases or batch processing, these technologies quickly introduce complexities because there is a fundamental impedance mismatch between the use cases and the tools employed. In contrast, streaming provides a much more natural model to think about, capture, and implement those real-time streaming use cases. Streaming SQL provides a simple yet powerful language to program streaming use cases.
By applying the approaches described in this blog, I am confident you will be pleasantly surprised by the powerful use cases that steaming SQL enables you to do. To learn more, I encourage you to check out the article, Patterns for Streaming Real-time Analytics, which provides a description of the patterns and use cases that are possible with streaming SQL. Additionally, you can try out samples and play with streaming SQL by downloading WSO2 Stream Processor. It is available for free under the Apache Licence 2.0 at https://wso2.com/analytics.