The duration of the batch process is critical in production environments. Incremental processing - the simple concept of processing only what needs to be processed - is one way of introducing major boosts to the efficiency of this process.
Consider a product that does not support incremental processing: say an analytics script that summarizes data every day. The first time the summarization script is run, it would process the whole data set and summarize the data.
The next day, when the process is called again, this script needs to process the whole dataset in order to process the unprocessed data. Thus, it'll not only end up processing today's data: it'll waste time processing yesterday's data. As time goes on, this script ends up processing weeks and months of data just to get a day's worth of insight.
With incremental processing, the batch job only processes the data partition that’s required to be processed, not the whole dataset (which has already been processed); this improves the efficiency drastically. The new script would would only process the last day's worth of data: which reduces the overhead of processing the already processed data again.
Think of how it can improve the performance in summarizations, starting from minutes, running all the way to years.
Using incremental analytics with the new DAS
Incremental analytics uses the timestamps of the events sent when when retrieving the data for processing. So firstly, when defining streams for incremental analytics, you need to add an extra field to the event payload as _timestamp LONG to facilitate this.
When sending the events, you have the ability to either add the timestamp to the _timestamp attribute or set it for each event at event creation.
In the spark script you use when defining the table, you need to add extra parameters to the table definition for it to support incremental analytics.
If you do not provide these parameters, it will be treated as a typical analytics table and for each query which reads from that table, would get the whole table.
Here's an example:
Create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");
When you are done with the summarization, you need to commit the status indicating the reading of the data is successful. This is done via INCREMENTAL_TABLE_COMMIT orders;
incrementalParams has two required parameters and an optional parameter.
incrementalParams “uniqueID, timePeriod, #previousTimePeriods”
uniqueID : REQUIRED
This is the unique ID of the incremental analytics definition. When committing the change, you need to use this ID in the incremental table commit command as shown above.
timePeriod : REQUIRED (DAY/MONTH/YEAR)
The duration of the time period that you are processing. If you are summarizing per DAY (the specified timePeriod in this case), then DAS has the ability to process the timestamp of the events and get the DAY they belong to.
Consider the situation with the following received events list. The requirement is to get the total number of orders placed per each minute.
|Customer ID||Phone Type||Order ID||Cost||_timestamp|
|1||Nexus 5x||33slsa2s||400||26th May 2016 12:00:01|
|12||Galaxy S7||kskds221||600||27th May 2016 02:00:02|
|43||iPhone 6s||sadl3122||700||27th May 2016 15:32:04|
|2||Moto X||sdda221s||350||27th May 2016 16:22:10|
|32||LG G5||lka2s24dkQ||550||27th May 2016 19:42:42|
And the last processed event is,
|12||Galaxy S7||kskds221||600||27th May 2016 15:32:04|
Assume that in the summarized table for the day 27th May 2016 there would be 2 events since when the script ran last. Now, there were only two events left for that particular day.
This is where the timePeriod parameter is used. For the last processed event, DAS calculates the “time period” it belongs to and pulls the data from the beginning of that time period onwards.
In this case the last processed event
|12||Galaxy S7||kskds221||600||27th May 201615:32:04|
Would trigger DAS to pull data from 27th May 2016 00:00:00 onwards.
#previousTimePeriods - Optional (int)
Specifying this value would allow DAS to pull from previous time periods onwards. For example, if you had set this parameter to 30, then it would fetch 30 more periods worth of data.
As per the above example, it would pull from 27th April 2016 00:00:00 onwards.
That's incremental analytics, which we're bringing in the 3.1.0 version of DAS. For more information do drop by wso2.com/products/data-analytics-server/.