TL;DR - Using Kinesis Data Analytics to get real-time insights in Flanders' traffic situation!
Why Real-time Data Analytics?
Real-time analytics allow businesses to react on the spot.
Seize opportunities when they occur, avoid problems before they happen or alert immediately when necessary.
Traditionally, batch-processing analytics was generally considered adequate as most BI users could meet their business goals by looking at weekly or monthly business numbers.
In this approach, recorded events were collected together based on the number of records or a certain period of time and stored somewhere to processed as a finite data set at a later point in time.
Recently however, a shift has occurred towards low latency real-time analytics, where data is processed as it arrives. Driving this shift are several factors such as increased access to data sources and improved computing power resulting from cloud computing.
Examples where businesses could benefit from real-time analytics are:
- The tracking of client behavior on websites (a visitor that is hesitating to convert may be persuaded to buy by a final incentive).
- Viewing orders as they happen allowing for the faster identification of and reaction to trends.
- The monitoring of traffic data to improve traffic flows or generate alerts for traffic jams.
How Can We Achieve Real-Time Analytics?
We can use AWS Kinesis Data Analytics to author SQL code that continuously reads and processes data in near real time. When using data in real time we speak of the hot path
for the data. "Hot", meaning that you want to do something with the data while it is still hot from just being produced.
For your hot path
you want to think about your data access patterns upfront!
Correctly defining your data access patterns upfront (and structuring your data accordingly) is extremely important. It will ensure you have data available that is compatible with the manner in which it is to be used, allowing for (near) real time reactions to it.
(Note: with data access pattern
we are referring to the way you will interact with your data: which fields will you query on, what attributes do you want to extract from nested data structures..)
What Kind Of Real-Time Analytics Will We Do?
For this blog post, we would like to explore the following practical example.
In Flanders, the "Flemish Traffic Institute" is continuously monitoring traffic on the highways. The visualization directly below shows all of the different measurement locations, where traffic is monitored.
This data is made available, on a per minute basis, via an API.
For a couple of critical locations in Flanders, we endeavored to set-up the following:
- Send out an alert when a traffic jam occurs.
- Analyze whether a traffic jam is emerging or disappearing.
- Get a real time status of the current situation.
Big Picture of Architecture
The government publishes the traffic data every minute as a big blob of xml
data, containing the information for all 4500 measurement locations in Flanders. We immediately convert this data into the JSON
format and divide it into in per-location measurement events. This preprocessing is achieved using AWS Lambda
. As the focus of this blog post is to discuss real-time analytics, we will not go deeper into the particulars of how we used Lambda to accomplish this.
The per-location measurement events are then streamed over Firehose
.
This Firehose
is used as an input for our Kinesis Data Analytics
application, which will provide real-time insights. Next, the results of our real-time analytics with Kinesis Data Analytics
are sent to a Kinesis Data Stream
, which can then be used by a Lambda
to, for example, generating traffic jam alerts or saving the results in DynamoDB
.
The format of the data arriving on Firehose is shown below. For the non-native Dutch readers, this data contains:
- A timestamp telling us when the measurement was taken.
- A unique id for the measurement location which can be linked to a physical place in Flanders.
- Information about the status of the measurement sensor.
- Information about the speed of vehicles of certain classes.
These classes represent the type of vehicle eg. motor, truck, car. For future reference let's remember that class 2 is the class representing the cars.
{
"beschrijvende_id": "H101L20",
"unieke_id": "3159", // unique location id
"lve_nr": "18",
"tijd_waarneming": "2020-04-06T17:51:00+01:00", // time of the observation
"tijd_laatst_gewijzigd": "2020-04-06T17:52:20+01:00",
"actueel_publicatie": "1",
"beschikbaar": "1",
"defect": "0", // is the sensor working correctly
"geldig": "0",
"verkeersintensiteit_klasse1": "0",
"voertuigsnelheid_rekenkundig_klasse1": "0",
"voertuigsnelheid_harmonisch_klasse1": "252",
"verkeersintensiteit_klasse2": "1",
"voertuigsnelheid_rekenkundig_klasse2": "110", // average speed of the cars (class2)
"voertuigsnelheid_harmonisch_klasse2": "110",
"...": "...",
"rekendata_bezettingsgraad": "6",
"rekendata_beschikbaarheidsgraad": "100",
"rekendata_onrustigheid": "86"
}
Kinesis Data Analytics: Nuts And Bolds
Source / Incoming Data
Let's dig deeper into the architecture. We'll start with the source for our analytics application, which is a Kinesis Firehose
stream.
Kinesis Firehose is a near real-time serverless service that can load data into your data lake or analytics tool and scales automatically.
Let's dissect that definition:
- Near real-time: data arrives on the stream and is flushed towards the destination of the stream on minimum intervals of 60 seconds or 1MiB.
- Serverless: you don't have to manage this stream yourself.
- Scales automatically: don't worry about sharding your stream.
AWS will take care of this for you.
It is important to note that there are 2 main options to stream your data, either Kinesis Firehose
or Kinesis Data Stream
. We decided to use Kinesis Firehose
, as we did not wish to handle sharding up or down ourself, as is required when using Kinesis Data Stream
. Firehose
also allows 5000 writes per second where Data Streams
will throttle you at 1000 writes per second (per shard). Firehose comes with the extra advantage that it can land your original data on S3 allowing you to build a data lake for batch processing later on. The other side of the medal is that Firehose
causes you to be near real-time instead of real-time.
If you would like to know more about Firehose
vs Data Streams
visit this page on the Lumigo Blog.
The Kinesis Firehose/Data Stream
that you choose as your input is your Streaming source
. You point this streaming source
to an in-application stream that is automatically created and will be named SOURCE_SQL_STREAM_001
by AWS.
Analytics
Now we dive into the heart of our real-time analytics flow, namely Kinesis Data Analytics
.
Kinesis Data Analytics is a way to analyze streaming data in real-time using SQL or integrated Java applications. https://aws.amazon.com/kinesis/data-analytics/
In this case we chose to use SQL to write our real-time analytics. In our Analytics Application
we'll use the Firehose
as the source for our application.
Notice that:
- The incoming data from `Firehose` will be available on the `In-application stream` named `SOURCE_SQL_STREAM_001`. As mentioned above this stream is named by AWS.
- There is a record preprocessor allowing you to do process or filter on the data before it enters the SQL analytics app.
Record preprocessor
This is a Lambda Function
which will receive batches of events and can transform these, drop these or let them pass on a one-by-one basis. The pseudo code below shows what the Lambda
does:
def handle(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
result = dropped_or_okay(payload)
if result == 'Ok':
payload = preprocess_payload(payload)
output_record = {
'recordId': record['recordId'],
'result': result,
'data': base64.b64encode(json.dumps(payload).encode("utf-8")).decode("utf-8")
}
output.append(output_record)
return {'records': output}
We see that the Lambda
uses the dropped_or_okay()
method to filter records. A record that will be dropped gets result Dropped
, one that can pass gets result Ok
.
The preprocess_payload
method is used to modify the payload. In the preprocess_payload
method I remove some uneccessary fields from the payload. (The method is not shown here). In our case, we are only interested in cars (vehicle class 2 in our payload). So we will remove the data from the other vehicle classes in order to avoid storing and having to deal with unnecessary data.
Real time analytics with a Kinesis Data Analytics application
Kinesis Data Analytics allows you to do real time analytics using SQL concepts.
Before I dive into the actual queries, there are two very important concepts to discuss:
In-application-SQL-Streams
As mentioned above, the streaming source (a Kinesis firehose
in our case) is mapped to an in-application stream named SOURCE_SQL_STREAM_001
. In-application streams are Amazon Kinesis Data Analytics concepts. The stream continuously receives data from your source. Think of it as basically a table that you can also query using SQL. Since a continuous stream of data is flowing over it, we call it an in-application stream.
In-application-SQL-pumps
You can actually create multiple in-application streams. This means you need a way to move and insert data or query results from one stream to another. This is done by a SQL pump
. AWS puts it as follows: "A pump is a continuously running insert query that moves data from one in-application stream to another in-application stream." (source: AWS doc)
In-application SQL streams and SQL pumps are the core concepts of a Kinesis Data Analytics Application.
Let's see a basic example of what that looks like. Remember the structure of the streamed events shown above and the name of the source in-application stream. SOURCE_SQL_STREAM_001
.
CREATE OR REPLACE STREAM "INCOMING_STREAM" (
"uniqueId" INTEGER,
"speed" INTEGER,
"bezettingsgraad" INTEGER,
"recordTimestamp" TIMESTAMP);
CREATE OR REPLACE PUMP "INCOMING_STREAM_PUMP" AS
INSERT INTO "INCOMING_STREAM"
SELECT STREAM
"unieke_id",
"voertuigsnelheid_rekenkundig_klasse2",
"rekendata_bezettingsgraad",
TO_TIMESTAMP(CAST("tijd_waarneming" AS BIGINT) * 1000) AS "recordTimestamp"
FROM "SOURCE_SQL_STREAM_001";
Explanation:
- I am creating an intermediate in-application stream `INCOMING_STREAM`.
- I create a pump to insert data towards the intermediate stream.
- I define the query that will yield the results which will be pumped towards the intermediate streams.
Windowed queries
So, now we know about (intermediary) in-application streams and pumps which move data between those streams. Let's now have a look at how we can make a window on our stream and aggregate results within that window. There are two kinds of windows: time-based vs row-based and 3 types: stagger, thumbling and sliding windows.
Concerning time- and row-based windows the names says it all. You either specify the window size in terms of time or number of rows.
Different types of windows:
Sliding Windows
A continuously aggregating query, using a fixed time or rowcount interval.
Thumbling Windows
A continuously aggregating query, using definite time-based windows that open and close at regular intervals.
Staggering Windows
Stagger windows can help with use cases where related records do not fall into the same (by ROWTIME
) time-restricted window. A challenge which you cannot solve when using thumbling windows.
A detailed explanation and example of these windows can be found in the AWS docs. Originally it was hard for me to remember the syntax of each of these windows. Let me show you how you can recognize each type:
Sliding Windows syntax:
... WINDOW W1 AS (PARTITION BY ... RANGE INTERVAL 'x' MINUTE PRECEDING
Thumbling Windows syntax:
... GROUP BY ... , STEP("YOUR_IN_APPLICATION_STREAM".ROWTIME BY INTERVAL 'x' MINUTE)
Staggering Windows syntax:
... WINDOWED BY STAGGER (PARTITION BY FLOOR(EVENT_TIME TO MINUTE), ... RANGE INTERVAL 'x' MINUTE)
In our application we use the Sliding window to find out what the average speed over the last x minutes was. Below you can recognize three windows, indicating the last 10 minutes, 2 minutes and the current timestamp.
CREATE OR REPLACE PUMP "STREAM_PUMP_SPEED" AS
INSERT INTO "SPEED_SQL_STREAM"
SELECT STREAM
"uniqueId",
AVG("speed") over W0,
AVG("speed") over W2,
AVG("speed") over W10
FROM "INCOMING_STREAM"
WINDOW
W0 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '0' MINUTE PRECEDING),
W2 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '2' MINUTE PRECEDING),
W10 AS ( PARTITION BY "uniqueId"
RANGE INTERVAL '10' MINUTE PRECEDING);
When using a timestamp to start your window from, you can only use either ROWTIME or APPROXIMATE_ARRIVAL_TIME
- `ROWTIME` represents the time at which the event was inserted to the first in-application stream.
- `APPROXIMATE_ARRIVAL_TIME` represents the time at which the event was added to the streaming source.
That is the source which is feeding data towards your kinesis analytics application.
!! You cannot use a timestamp that originates from a field in your event to window by. This actually makes sense since you are working with real time data, which implicates the data should arrive in real time!
Using the LAG operator we can look back in our window and access the data of the previous event(s).
In the following example, I am using LAG
to look back in the current Sliding Window
and extract the speed from the previous event. This allows me to output a new event with both the current speed and the previous speed.
CREATE OR REPLACE PUMP "SPEED_CHANGE_PUMP" AS
INSERT INTO "SPEED_CHANGE_SQL_STREAM"
SELECT STREAM "s"."uniqueId",
LAG("s"."speed", 1, "s"."speed") OVER CURRENT_WINDOW AS "previousSpeed",
"s"."speed" AS "currentSpeed"
FROM "SPEED_SQL_STREAM" AS "s"
WINDOW CURRENT_WINDOW AS (PARTITION BY "s"."uniqueId" ROWS 3 PRECEDING);
Using static reference data
You can add reference data that you can use to enrich the query results of your application.
In our case the data of our application already contains an ID of the place where the data was measured. The name of the places themselves is not included in the data. However the ID of a place is statically linked to the name of that measurement location and thus could be found using reference data. This reference data has the following format:
It's a csv
file which in tab delimited. You can also use other separator. This files must be located on S3. When your application starts, it will read the file from S3 and make the data available as a table. A table that you can use in your queries. Below I joined a query result on location ID to retrieve the name of the measurement location from the reference data.
CREATE OR REPLACE PUMP "YOUR_IN_APPLICATION_STREAM" AS
INSERT INTO "YOUR_IN_APPLICATION_PUMP" ("uniqueId", "currentSpeed", ..., "location")
SELECT STREAM
"sdi"."uniqueId",
"sdi"."currentSpeed",
...,
"ml"."locatie",
FROM "SPEED_DIFF_INDICATOR_SQL_STREAM" AS "sdi" LEFT JOIN "measurementLocations" as "ml"
ON "sdi"."uniqueId" = "ml"."id";
Destination / outcome
Your kinesis analytics application outputs its result towards a destination.
We saw that our intermediary results are always pumped towards an in-application stream. To get these results out of our application we have to couple the in-application stream towards an exterior data stream like a kinesis Firehose or a kinesis data stream.
Mind that an in-application stream can only be coupled to one exterior data stream. If you want to output the same result towards two different destinations you'll have to create another in-application stream which receives the same data. That is also the reason why you see two different in-application streams coupled to the two destinations.
What to do with the real time results
In the architecture diagram above, you will notice that the kinesis data stream, which receives the analytics results, is coupled to a Lambda Function. That gives you opportunities. You could directly send out alerts based on the data that the function receives from the stream. Or you can save the results in a real time data store which you can use to always query for the current situation.
Here I choose the latter. I am storing the real time resuls in DynamoDB
. This table holds the current situation for each of the different measuring points in Belgium. I then provide an API through which a client can fetch the current traffic situation in Belgium for a certain point.
Another Lambda Function is listening on the change stream of this table. It's actually monitoring whether a traffic jam is present or not. If the traffic jam flag switches between True
or False
we send out a slack message to notify interested parties that a traffic jam has appeared or has dissolved.
Key Takeaways
Great, we just learned how we can use Kinesis data analytics
to get real time insights in our streamed data. In our case it gave us the possiblity to get an on-demand view of the traffic jams in Belgium and send out alerts for emerging traffic jams.
Kinesis data analytics is a great tool for real time analytics. There are some some knobs and twists which I think are really good to know!
Here are once again the key takeaways from this blog:
- Separate cold and hot flow of your data completely (real time vs batch).
- Real time date should be produced in real time and arrive in real time.
- Think about your data access pattern upfront.
- Mind the differences between `Kinesis Firehose` and `Kinesis Data Streams` to stream your data.
- Preprocess and/or filter your records before they go in your `Kinesis analytics application` by using a record preprocessor Lambda Fuction.
- You can use `Windowing` to aggregate or correlate results over a certain timespan.
- You can only use the timestamps `ROWTIME` and `APPROXIMATE_ARRIVAL_TIME`.
- Add static reference data to your application by making it available via S3.
- The core SQL concepts of the `Kinesis analytics app` are `SQL STREAMS` and `SQL PUMPS`.
Get in touch with the authors:
Triggered to team up with CloudWay?
Get in touch