This blog will cover data ingestion from Kafka to Azure Data Explorer (Kusto) using Kafka Connect.
Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from any diverse sources, such as websites, applications, IoT devices, and more. Kafka Connect platform allows you to stream data between Apache Kafka and external systems in a scalable and reliable manner. The Kafka Connect Sink connector for Azure Data Explorer allows you to move data in Kafka topics to Azure Data Explorer tables which you can later query and analyze.
Here is the GitHub repo for this blog - https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
The goal is to get started quickly, so we will keep things simple and Docker-ize everything! This includes Kafka, Zookeeper, Kafka Connect worker and the event generator application - defined in docker-compose.yaml
Over the course of this tutorial, you will:
- Get an overview of the individual components
- Configure and setup Azure Data Explorer and install the connector
- Run the end to end demo
If you're looking for a comprehensive coverage of data ingestion with Azure Data Explorer, Kafka and Kubernetes and like a hands-on learning experience, please check out this workshop! https://github.com/Azure/azure-kusto-labs
Pre-requisites
You will need a Microsoft Azure account. Maybe try a free one?
Install Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser.
Docker and Docker Compose installed
Overview
As previously mentioned, all the components are defined inside docker-compose.yaml
file. Let's go over it bit by bit:
The Kafka and Zookeeper part is pretty straightforward - using the debezium images
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
The events-producer
service is a simple application that sends Storm Events data to a Kafka topic. Storm Events data is a canonical example used throughout the Azure Data Explorer documentation (for example, check this Quickstart and the complete CSV file). The producer app uses the original CSV, but only includes selected fields (such as start and end time, state, source etc.) rather than the entire row (which has more than 20 columns). Here is the sample data:
2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9487,NEW YORK,Winter Weather,Department of Highways
...
The service component in Docker Compose is defined as such:
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
The sink connector is where a lot of the magic happens! Let's explore it:
Kafka Sink Connector for Azure Data Explorer
Here is the kusto-connect
service in docker compose file:
kusto-connect:
build:
context: ./connector
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
The container is built from a Dockerfile
- this makes it easier for you to run it locally as opposed to pulling it from an external Docker registry
FROM debezium/connect:1.2
WORKDIR $KAFKA_HOME/connect
ARG KUSTO_KAFKA_SINK_VERSION
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar
It's based on top of the Debezium Kafka Connect image. Simply download the Kusto Connector JAR (version 1.0.1 at the time of writing) and place it in the Kafka Connect plugins directory. That's it!
Here is what the sink connector configuration file looks like:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 50000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
The process of loading/importing data into a table in Azure Data Explorer is known as Ingestion. This is how the the connector operates as well.
Behind the scenes, it uses the following modules in the Java SDK for Azure Data Explorer
-
data
: to connect, issue (control) commands and query data -
ingest
: to ingest data
At the time of writing, the data formats supported by the connector are: csv
, json
, txt
, avro
, apacheAvro
, tsv
, scsv
, sohsv
and psv
. Data in the Kafka topics is written to files on disk. These are then sent to Azure Data Explorer based on the following connector configurations - when file has reached flush.size.bytes
or the flush.interval.ms
interval has passed.
The only exception to the above mechanism is the
avro
andapacheAvro
data types which are handled as byte arrays
By "sent to Azure Data Explorer", what I really mean that the file is queued for Ingestion (using IngestClient.ingestFromFile)
Alright, lots of theory so far...
.. let's try it out!
Clone this repo:
git clone https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
cd kafka-kusto-ingestion-tutorial
Start off creating an Azure Data Explorer cluster and database using Azure Portal, Azure CLI or any of the client SDKs such as Python.
Once that's done, create a table (Storms
) and respective mapping (Storms_CSV_Mapping
):
.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Start containers and install the connector
Before installing the connector, we need to create a Service Principal in order for the connector to authenticate and connect to Azure Data Explorer service.
Use az ad sp create-for-rbac command:
az ad sp create-for-rbac -n "kusto-sp"
You will get a JSON response as such - please note down the appId
, password
and tenant
as you will be using them in subsequent steps
{
"appId": "fe7280c7-5705-4789-b17f-71a472340429",
"displayName": "kusto-sp",
"name": "http://kusto-sp",
"password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
"tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
}
Add permissions to your database
Provide appropriate role to the Service principal you just created. To assign the admin
role, follow this guide to use the Azure portal or use the following command in your Data Explorer cluster
.add database <database name> admins ('aadapp=<service principal AppID>;<service principal TenantID>') 'AAD App'
Start the containers:
docker-compose up
The producer application will start sending events to the storm-events
topic. You should see logs similar to:
....
events-producer_1 | sent message to partition 0 offset 0
events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
events-producer_1 |
events-producer_1 | sent message to partition 0 offset 1
events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
events-producer_1 |
events-producer_1 | sent message to partition 0 offset 2
events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
events-producer_1 |
events-producer_1 | sent message to partition 0 offset 3
events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9494,NEW YORK,Winter Weather,Department of Highways
events-producer_1 |
events-producer_1 | sent message to partition 0 offset 4
events-producer_1 | 2020/08/20 16:51:35 event 2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
....
We can now install the sink connector to consume these events and ingest them into Azure Data Explorer
Replace the values for following attributes in adx-sink-config.json
: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(the database name) and kusto.url
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 50000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
In a different terminnal, keep a track of the connector service logs:
docker-compose logs -f | grep kusto-connect
Install the connector:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
//check status
curl http://localhost:8083/connectors/storm/status
The connector should spring into action. Meanwhile in the other terminal, you should see logs similar to:
kusto-connect_1 | INFO || Refreshing Ingestion Resources [com.microsoft.azure.kusto.ingest.ResourceManager]
kusto-connect_1 | INFO || Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_0.csv.gz) of size (9192) at current offset (93) [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
kusto-connect_1 | INFO || WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 1: {storm-events-0=OffsetAndMetadata{offset=94, leaderEpoch=null, metadata=''}} [org.apache.kafka.connect.runtime.WorkerSinkTask]
ct.runtime.WorkerSinkTask]
kusto-connect_1 | INFO || Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_94.csv.gz) of size (1864) at current offset (111) [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
kusto-connect_1 | INFO || WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 2: {storm-events-0=OffsetAndMetadata{offset=112, leaderEpoch=null, metadata=''}} [org.apache.kafka.connect.runtime.WorkerSinkTask]
....
Wait for sometime before data ends up in the Storms
table. To confirm, check the row count and confirm that there are no failures in the ingestion process:
Storms | count
. show ingestion failures
Once there is some data, try out a few queries. To see all the records:
Storms
Use where
and project
to filter specific data
Storms
| where EventType == 'Drought' and State == 'TEXAS'
| project StartTime, EndTime, Source, EventId
Use the summarize
operator
Storms
| summarize event_count=count() by State
| where event_count > 10
| project State, event_count
| render columnchart
These are just few examples. Dig into the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.
If you want to re-start from scratch, simply stop the containers (
docker-compose down -v
), delete (.drop table Storms
) and re-create theStorms
table (along with the mapping) and re-start containers (docker-compose up
)
Clean up
To delete the Azure Data Explorer cluster/database, use az cluster delete or az kusto database delete
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
That's a wrap!
I hope this helps you get started building data ingestion pipelines from Kafka to Azure Data Explorer using the Kafka Connect sink connector. This is not the only way to ingest data into Azure Data Explorer (of course!). You're welcome to explore the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub and much more!
Until next time, Happy Exploring!
Top comments (0)