Real-time analytics in the aviation industry - Part 2: leveraging Kinesis Analytics for SQL
How Kinesis Analytics for SQL fits into our Data Pipeline and what challenges we faced with it
This is the second and final part of my series about designing and building an event generation engine. In Part 1 I explained why we selected AWS Kinesis Analytics for SQL in the first iteration of our project and I will now share how it fits into our existing architecture and the tweaks we made ensuring it worked for us.
Kinesis Analytics: The moving parts
The Data Source
Prior to this project, data was being captured from the aircraft, sent to a data pipeline for enrichment and stored in the data lake for analytics.
Our intention was to modify this data pipeline by adding a step in parallel, that could send source data to the data lake and also to Kinesis Analytics to generate events. We found that Kinesis Analytics can read from two streaming sources: Kinesis Data Streams and Kinesis Data Firehose. We decided to extend the data pipeline to push to Kinesis Data Streams and then automatically to Kinesis Analytics for analysis.
Why Kinesis Data Streams? Kinesis Data Streams offers lower latency than Kinesis Data Firehose and we can be smart about sharding in the future to balance the workload.
The Destination
Kinesis Analytics can output data to Kinesis Data Streams, Kinesis Data Firehose or to a Lambda Function (usually for further data enrichment). In our case we want to push events to the data lake, so Kinesis Data Firehose is the right fit as it connects with S3 (where our data lake lives) and can buffer data before flushing to S3 (keeping the file size under our control).
The Transformation
Kinesis Analytics is responsible for running our transformation step. This is where our custom event generation logic lives.
Inside one - or many - Kinesis Analytics applications we map incoming data to an In-application stream. This object (let’s name it input_stream) pulls data from our Kinesis Data Stream and can be queried using SQL. We then check if an event must be generated by reading customer configurations (more on how we do it later in this post) and we write the output to another stream. This second stream (let’s name it output_stream) is mapped to a Kinesis Firehose Delivery stream, responsible for writing to the data lake.
The following SQL code gives you a taste of what a simple query inside Kinesis Analytics for SQL looks like:
-- Output Stream Definition: linked to Kinesis Firehose
CREATE OR REPLACE STREAM "OUTPUT_STREAM"
(
"column_1" varchar(100),
"column_2" varchar(100)
);-- Output pump: responsible for generating an event
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS
INSERT INTO "OUTPUT_STREAM"
SELECT STREAM
"input"."value_1",
"input"."value_2"
FROM "INPUT_STREAM" "input"
Simply put, the image below represents what has been added to the existing pipeline.
The Pipeline Challenges
Now that we understand how Kinesis Analytics fits into the pipeline and what happens inside the service, I will share the challenges we faced with the updated pipeline.
1 - Pipeline Load
Kinesis Data Stream scales by adding shards. Every shard can handle up to 2MB/s for reads and 1MB/s for writes. The more shards you add the more you pay for the service.
As part of load testing - to reproduce what production looks like - we went over the 1MB/s write limit. The initial thought was to solve it by increasing the number of shards. However, we decided to try some alternatives first to keep cost under control:
- Compression: We experimented compressing the payload using a Python library called zlib. We were pleasantly surprised this improved the compression rate by 85%.
- Chunking: Instead of writing the full payload at once to the Kinesis Data Stream we chunked it to send fewer records in one call.
Both improvements gave us the confidence to run the Kinesis Data Stream with a single shard. In addition, we put alerting in place to let us know when we hit 80% of the limit so we can act before we start seeing throttling errors.
However, we did face a challenge with compression. Kinesis Data Analytics cannot read compressed data so we needed a step in-between Kinesis Data Streams and Kinesis Data Analytics to decompress the payload. Fortunately, Kinesis Data Analytics has a feature that allows a pre-process lambda function to be called. We made this lambda decompress the payload and return it to Kinesis Data Analytics for consumption. The image below illustrates the updated pipeline.
2 - Kinesis Data Analytics - Reading Customer Configurations
This was the biggest challenge to overcome as part of the event generation engine. First, let’s introduce the available data objects inside a Kinesis Data Analytics Application:
- In-Application Stream: already mentioned in this article, the object used to reference data coming from the source. E.g: reading from a Kinesis Data Stream or Kinesis Firehose Delivery Stream.
- Reference Data Source: file stored on S3 that is used to enrich/join with the input data stream.
Our end goal was to make the event generation engine read customer configurations and join them with incoming streaming data to decide whether to generate an event or not. The clear way forward was to store customer configurations on a S3 file and use the Reference Data Source object to apply our logic. However, this comes with problems we were looking to avoid:
- Maintaining the S3 file: customer configurations are constantly updated from the front-end and stored on a NoSQL database. Keeping the S3 file up to date would unlock other challenges like concurrency handling and S3 eventual consistency (which would no longer be a problem by the way).
- Kinesis Data Analytics Update: to reflect changes applied to the S3 file inside Kinesis Data Analytics we are required to run an update command that puts the stream in updating mode. Besides making the stream unavailable while updating, it also breaks the application’s statefulness. What do I mean by that? It means that if you are using windowing, the application loses track of previous records and may not behave as you would expect.
How did we solve it?
To be honest I don’t think Kinesis Analytics for SQL offers a decent way of managing this type of scenario where data used to “enrich” streaming data changes quite often.
We understood that the S3 file reference would not work at all and as there was no other data object available we ended up appending customer configurations as part of the streaming data and we used SQL to extract the configuration from the In-application stream inside Kinesis Data Analytics.
Didn’t this approach bring the Pipeline Load headache back?
Well, no. That’s because we don’t enrich customer configurations in the source before sending to the Kinesis Data Stream, but instead, we took advantage of the pre-process lambda to do the job for us. This lambda now, besides decompressing the payload, also reads customer configurations from the NoSQL database and appends them to the stream.
Scaling the Pipeline
Going forward we might have to scale this pipeline to accomodate more concurrent flights and extra pre-configured event definitions.
Where we might need to scale:
- Additional shards to Kinesis Data Streams (data load);
- Additional Kinesis Analytics Applications (more event definitions);
To make sure the above works as expected we ran some experiments and we validated that the pipeline behaviour remains consistent even if we scale only in one side. In summary, multiple Kinesis Data Stream shards will work with a single Kinesis Analytics Application and multiple Kinesis Analytics Applications also work reading from a single Kinesis Data Stream shard without concurrency issues.
The Future of the Event Generation Engine
The setup explained in this article can successfully handle the current requirements of this project and also the expected workload. There are, however, some factors that might lead us to iterate and potentially replace Kinesis Data Analytics for SQL in the future:
- Increase of customer power to define event configurations: at this point in time, we - Spidertracks - define what an event configuration looks like and provide to our customers the ability to input their thresholds. If this product evolves to the point where customers take full control of the event configuration we might not be able to do so with pure SQL and we will look for a programatic approach using Spark Streaming or Apache Flink instead.
- Cost: at the moment we are using a serverless service which is great from a low maintenance point of view but it comes with a higher cost compared to the optimisations we could perform by running our own cluster.
That’s it. Thanks for reading about our journey!