Change the data paradigm while keeping the language. Find out how Apache Kafka® and Apache Flink® allow you to move from batch processing to streaming, still defining the data pipeline in SQL.
SQL streaming data pipelines on Aiven with Apache Kafka® and Apache Flink®
If data is the new gold then data pipelines must be the mining shafts and SQL the pickaxe: allowing information to travel across the company from the sources to the desired decision-enabling dashboards. But driving the data assets to the right audience is only part of the problem; performing it in a timely manner is becoming more and more critical. The old days of watching today a dashboard containing yesterday's data are gone, we need to analyse, capture trends and detect outliers as soon as possible.
More and more companies are consequently going away from batch and embracing streaming solutions which enable near real time data pipeline definition. To make this transition easier, selecting a target technology speaking a language similar to the original is usually a sensible choice, since it enables existing skills reusability with small adaptations.
The combination of Apache Kafka® and Apache Flink®, two open source projects aiming respectively at transmitting and computing streaming data, is therefore a good choice, since it enables the transition from batch to streaming keeping the data pipeline definitions in the data practitioners most beloved language: SQL!
We previously wrote about the duo, showcasing how you could write your own streaming data pipelines using a dockerized version of Apache Flink®'s SQL client. This time we'll create new data pipelines on a different use-case and show how we can minimize the analytics latency. On top of this, there is more good news: we don't have to care anymore about running Apache Flink® ourselves, since we can rely on the managed services provided by Aiven!
The use case
For the purpose of this blog post, we are going to mimic an inbound streaming dataset of IoT CPU utilization measurements. Since IoT devices can be geographically distributed, messages can arrive with a delay or potentially out of order, thus, before further processing, we want to allow late arrivals of IoT records with a delay up to 10 seconds. In rough terms this means that we'll wait 10 more seconds before finishing off any window calculation.
When we monitor any type of hardware, we might want to check if the device health parameters are within an optimal range, and if not, start raising alerts. We can achieve that by creating a data pipeline to filter all high cpu utilization records and push them to a new Apache Kafka® topic, where a downstream consuming application will then trigger the alerts. In the old batch days we would run the same query every few minutes to check for high values, with streaming we can redirect the problematic records as soon as they appear.
Checking every individual sample against the threshold might be a bit bursty, and we are ok if a single cpu sample goes over the limit. On the other side, we might want to calculate the average and maximum cpu level at 5 minutes interval, since those metrics can help us identifying problematic CPUs with consistent heavy load. To achieve this second monitoring step, we'll create another pipeline aggregating samples with 5 minutes windows.
Please consider, that this use case was also solvable in batch mode, but in that case we had to start the batch load 10 seconds (the allowed delay) after the 5 minutes window and see the results only after the batch time. If the batch time of collecting data, calculating and storing results is 1 minute, we would discover the end status with 1 minute and 10 seconds of delay.
With Apache Flink®, if the query allows, the window information is captured and calculated incrementally during the window time itself. Therefore, after the 10 seconds delay is finished, there can be a minimal overhead due to the last computation to finish before emitting the result. For simple queries, we would retrieve the results with just a little bit more of the 10 seconds of forced delay.
What we'll notice when writing the two data pipelines is how Apache Flink® SQL is similar to the SQL we would use to query a relational database. This makes the migration from batch processes to streaming a matter of learning the little peculiar tricks of Apache Flink.
Define the building blocks
We will use Aiven for Apache Kafka® as the data bus and Aiven for Apache Flink® to define some data pipelines in SQL.
Aiven provides a beautiful web console that we can use to create all our services, but for the aim of this blog post, we will rely on the Aiven CLI to setup and manage our instances, since it provides a way to script the entire process and produce replicable results.
Once the Aiven CLI is installed, we can start creating the Aiven for Apache Kafka® service by issuing the following command in the terminal:
avn service create demo-kafka \
--service-type kafka \
--cloud google-europe-west3 \
--plan business-4 \
-c kafka.auto_create_topics_enable=true \
-c kafka_rest=true \
-c schema_registry=true
The above creates an Apache Kafka® instance named demo-kafka
located in google-europe-west3
region, with the 3-node cluster defined by the business-4
plan. To store schema information we enable schema registry via Aiven's Karapace, and we allow querying it via REST calls by enabling Kafka REST. Finally we allow the automatic creation of topics on the first message, useful for a demo project like this.
Now, we can create the Aiven for Apache Flink® service with:
avn service create demo-flink \
--service-type flink \
--cloud google-europe-west3 \
--plan business-4
Compared to the previous call, the only difference is the service name (now demo-flink
) and type (now flink
).
To complete the setup, we need to connect the two pillars with a service integration, and again the Aiven CLI is our friend.
avn service integration-create \
-t flink \
-s demo-kafka \
-d demo-flink
In the above, we specify an integration of type flink
connecting the data source demo-kafka
to the Aiven for Apache Flink® service called demo-flink
.
Define the streaming input
We need some data to play with, and instead of the usual pizza example, we will now use fake metrics reported by IoT (Internet of Things) devices. The beauty is that it's contained in the same GitHub repository that we were using for our previous example, which has a flag called subject
allowing us to generate various types of fake data.
The associated GitHub repository provides a dockerised version. To start using it we first clone the repository
git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker
Next, we need to create an access token, that will be used by the Docker instance to retrieve the Apache Kafka®'s service URI and certificates, with the following Aiven CLI command and jq to fetch the results:
avn user access-token create \
--description "Token used by Fake data generator" \
--max-age-seconds 3600 \
--json | jq -r '.[].full_token'
The above generates a token valid for 1 hour (3600 secs). Take note of the command output in the FULL_TOKEN
field since now it's time to include it in the repo config file. Within the fake-data-producer-for-apache-kafka-docker
folder, let's copy the conf/env.conf.sample
to conf/env.conf
and edit the file with the following content:
PROJECT_NAME="[YOUR_PROJECT_NAME]"
SERVICE_NAME="demo-kafka"
TOPIC="iot-data-input"
PARTITIONS=1
REPLICATION=2
NR_MESSAGES=0
MAX_TIME=1
SUBJECT="metric"
USERNAME="[YOUR_ACCOUNT]"
TOKEN="[YOUR_ACCESS_TOKEN]"
Replace the [YOUR_PROJECT_NAME]
with the project name were Aiven for Apache Kafka® is running, and the duo [YOUR_ACCOUNT]
, [YOUR_ACCESS_TOKEN]
with the account credentials necessary to attach to Apache Kafka®. If you followed these examples they would be your email and the token you generated a few steps above.
Time to build the docker image with:
docker build -t fake-data-producer-for-apache-kafka-docker .
If you change any parameter in the conf/env.conf
file (like the access token), you need to rebuild the image to take the changes into the Docker images.
Finally start the fake stock data flow with:
docker run fake-data-producer-for-apache-kafka-docker
After few seconds we should see an infinite amount of messages getting created in the Apache Kafka® topic named iot-data-input
. We can check them using kcat. Use the instructions in the dedicated doc page to install and set up the kcat.config
configuration file and then check the data flowing in Apache Kafka® with.
kcat -F kcat.config -C -t iot-data-input
We should see a stream of IoT device cpu usage measurements.
Define a data filtering pipeline
For the rest of the post, we'll need the ID of the integration we just created. To retrieve it, we can issue the following command relying on the Aiven CLI and some jq
filtering of the results
KAFKA_FLINK_SI=$( \
avn service integration-list \
demo-flink \
--json \
| jq -r '.[] | select(.source == "demo-kafka") | .service_integration_id')
The above command lists the integrations, filtering the one pointing to the demo-kafka
service and storing its ID in the KAFKA_FLINK_SI
variable, that we'll use later.
The next step is to setup a streaming data pipeline in Apache Flink®. Since we are using the Aiven managed service we can use the Aiven client to set up all the steps, by first defining an Apache Flink® table over the inbound Apache Kafka® topic in a new terminal window with:
avn service flink table create demo-flink $KAFKA_FLINK_SI \
--table-name iot_in \
--kafka-topic iot-data-input \
--kafka-connector-type kafka \
--kafka-value-format json \
--kafka-startup-mode earliest-offset \
--schema-sql '''
hostname VARCHAR,
cpu VARCHAR,
usage DOUBLE,
`occurred_at` BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(`occurred_at`, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '\''10'\'' seconds'''
The above command generates an Apache Flink® table definition over the demo-flink
using the integration id with the Apache Kafka® service stored in the $KAFKA_FLINK_SI
we fetched before. The table is called iot_in
, reading from the beginning (earliest-offset
) of the topic iot-data-input
using the standard kafka
connector (check more about the different Apache Kafka® connectors in the dedicated documentation) and defining the inbound source of data to be json
format.
The schema of the table maps the fields hostname
, cpu
, usage
and occurred_at
. Moreover it sets a new field, called time_ltz
, which casts the occurred_at
timestamp in linux format to the native timestamp used by Apache Flink®. Finally the WATERMARK
section allows late events to arrive: we are enabling the IoT measurements to be included as valid and considered for the downstream data pipelines if they arrive within 10
seconds delay.
We can notice that, apart from the peculiar WATERMARK
definition, the rest of the SQL definition is really similar to what we used over relational databases.
We can define the data output of our filtering data pipeline with the following command:
avn service flink table create demo-flink $KAFKA_FLINK_SI \
--table-name iot_filtered_alert \
--kafka-topic iot-filtered-alert \
--kafka-connector-type kafka \
--kafka-value-format json \
--schema-sql '''
hostname VARCHAR,
time_ltz TIMESTAMP(3),
cpu VARCHAR,
usage DOUBLE
'''
Next we need to fetch the two table IDs that we're going to need for the filtering data pipeline definition. We can retrieve the ids with:
TABLE_IN_ID=$(avn service flink table list demo-flink --json \
| jq -r '.[] | select(.table_name == "iot_in") | .table_id')
TABLE_FILTER_OUT_ID=$(avn service flink table list demo-flink --json \
| jq -r '.[] | select(.table_name == "iot_filtered_alert") | .table_id')
Now we can define the SQL data pipeline for filtering with:
avn service flink job create demo-flink my_first_filter \
--table-ids $TABLE_IN_ID $TABLE_FILTER_OUT_ID \
--statement '''
insert into iot_filtered_alert
select
hostname,
time_ltz,
cpu,
usage
from
iot_in
where usage > 90
'''
The above filters the IoT measurements having usage greather than 90% and inserts them into the Apache Kafka® topic pointed by the iot_filtered_alert
table. We are passing the table IDs $TABLE_IN_ID
and $TABLE_FILTER_OUT_ID
retrieved at the previous step.
We can check the filtered data being pushed to the Apache Kafka® topic named iot-filtered-output
with kcat
and the following call
kcat -F kcat.config -C -t iot-filtered-alert
Only measurements with over 90% utilization should appear in the iot-filtered-alert
topic, similar to the below:
{"hostname":"dopey","time_ltz":"2022-01-21 11:05:23.319","cpu":"cpu4","usage":90.51760978462883}
{"hostname":"happy","time_ltz":"2022-01-21 11:05:25.954","cpu":"cpu4","usage":99.6863297672615}
{"hostname":"sneezy","time_ltz":"2022-01-21 11:05:30.701","cpu":"cpu5","usage":97.19928378993606}
{"hostname":"doc","time_ltz":"2022-01-21 11:05:32.142","cpu":"cpu2","usage":95.69989296729409}
{"hostname":"doc","time_ltz":"2022-01-21 11:05:32.773","cpu":"cpu3","usage":94.04115316937872}
{"hostname":"grumpy","time_ltz":"2022-01-21 11:05:34.052","cpu":"cpu2","usage":95.45458336597062}
{"hostname":"bashful","time_ltz":"2022-01-21 11:05:34.506","cpu":"cpu1","usage":93.83903103097724}
Great, we built our first filtering pipeline redirecting problematic samples to a new Apache Kafka® topic where downstream applications can access them and generate the required alerts. It's streaming, so the alerts are detected and propagated in near real time.
Define an aggregation pipeline
In the second scenario we wanted to define an aggregation over the IoT metrics calculating the average and maximum usage over 5 minutes windows to help us smoothing the bursty sample behavior. The source table is always the iot_in
defined above, we need only to define a new output table with:
avn service flink table create demo-flink $KAFKA_FLINK_SI \
--table-name iot_avg_out \
--kafka-topic iot-avg-output \
--kafka-connector-type kafka \
--kafka-value-format json \
--schema-sql '''
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
hostname VARCHAR,
cpu VARCHAR,
avg_usage DOUBLE,
max_usage DOUBLE
'''
The above command generates a new Apache Flink® table definition called iot_avg_out
pointing to the topic iot-avg-output
. The results will be written in JSON format. The schema SQL includes the window_start
and window_end
columns defining the boundaries of the window calculated using the event time, the IoT device hostname
and cpu
, and the aggregated columns for average and maximum usage.
As before, the streaming job definition requires the table ID as parameter, we can retrieve the new table ID with the following command:
TABLE_AGG_OUT_ID=$(avn service flink table list demo-flink --json \
| jq -r '.[] | select(.table_name == "iot_avg_out") | .table_id')
With all the information needed, we can now define the streaming job with:
avn service flink job create demo-flink my_first_agg \
--table-ids $TABLE_IN_ID $TABLE_AGG_OUT_ID \
--statement '''
insert into iot_avg_out
select
window_start,
window_end,
hostname,
cpu,
avg(usage),
max(usage)
from
TABLE(TUMBLE( TABLE iot_in, DESCRIPTOR(time_ltz), INTERVAL '\''5'\'' MINUTES))
group by
window_start,
window_end,
hostname,
cpu
'''
The above call creates a new job named my_first_agg
including the $TABLE_IN_ID
from earlier and the $TABLE_AGG_OUT_ID
table id we fetched in the previous step. The SQL statement creates the 5 minutes time windows, based on the event time column time_ltz
, with the TUMBLE
option (check out the dedicated window documentation) and performs the aggregations. We can notice again, that, apart from the custom TUMBLE
definition, the SQL statement is really similar to the one we could use in a relational database.
To check the aggregated results being pushed to the iot-avg-output
Apache Kafka® topic we use kcat
and the following call:
kcat -F kcat.config -C -t iot-avg-output
The result should be similar to the following, showing the window start and end time, the IoT device hostname, cpu and the aggregated utilization metrics
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"dopey","cpu":"cpu2","avg_usage":83.75267676383785,"max_usage":98.69540798403519}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"happy","cpu":"cpu2","avg_usage":86.92580172018437,"max_usage":99.19071554040988}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"happy","cpu":"cpu1","avg_usage":88.89410118795028,"max_usage":98.63471212455435}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"dopey","cpu":"cpu4","avg_usage":84.0739509212965,"max_usage":98.74529247031236}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"grumpy","cpu":"cpu4","avg_usage":79.39103768789909,"max_usage":91.3979757069514}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"sneezy","cpu":"cpu2","avg_usage":84.2944495941075,"max_usage":99.55062006322083}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"grumpy","cpu":"cpu3","avg_usage":82.57313518548608,"max_usage":99.47190111994642}
{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"sneezy","cpu":"cpu4","avg_usage":86.71661332259018,"max_usage":99.61266191120228}
Now that we created our 5 minutes windows topic, we could iterate again in our monitoring solution, by adding a follow up Apache Flink® job filtering only the data going over a threshold. The beauty of this approach is that we can plug one step of our solution at a time, storing the intermediate data in the common bus represented by Apache Kafka®.
Furthermore since the data is stored in an Apache Kafka® topic, we could use Apache Kafka Connect® to sink the data to OpenSearch® and use OpenSearch Dashboard or Grafana® to visualize the trends.
With the combination of Apache Flink®, Apache Kafka®, and Apache Kafka Connect® we have endless possibilities in terms of streaming analytics and integrations.
Next steps
Apache Kafka® and Apache Flink® allow you to move away from batch processing and embrace streaming while keeping a familiar SQL interface for the pipeline definitions. Even more, Apache Flink® rich SQL syntax allows you to define aggregations, boundaries and temporal limits that would be somehow hard to define on traditional databases. To know a bit more about this open source combination and the related managed services Aiven is offering:
- Check out Aiven for Apache Kafka®
- Check out Aiven for Apache Flink®
- Understand the difference between Event time and Processing time
- Learn all the available Apache Flink® SQL functions
If you're not using Aiven services yet, go ahead and sign up now for your free trial at https://console.aiven.io/signup!
Top comments (0)