DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

Using the Debezium MS SQL connector with ksqlDB embedded Kafka Connect

Prompted by a question on StackOverflow I thought I’d take a quick look at setting up ksqlDB to ingest CDC events from Microsoft SQL Server using Debezium. Some of this is based on my previous article, Streaming data from SQL Server to Kafka to Snowflake ❄️ with Kafka Connect.

Setting up the Docker Compose

I like standalone, repeatable, demo code. For that reason I love using Docker Compose and I embed everything in there - connector installation, the kitchen sink - the works.

You can find the complete Docker Compose file on GitHub.

Installing connectors in ksqlDB without Confluent Hub client

I’ll usually take advantage of the command: stanza in a Docker Compose service to do things like connector installation, as detailed here. In ksqlDB 0.11 the Confluent Hub client is absent so I’ve had to take a slightly hackier route. If you head over to Confluent Hub and download the connector you want (in this case Debezium MS SQL) you can check the network console to get the direct URL, in this case

https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip
Enter fullscreen mode Exit fullscreen mode

Now, this URL is liable to change but for now, it works :) (I realise that this runs contrary to making a demo repeatable, but what’s life if we can’t live on the edge a bit)

With this code we can download and install the connector within the ksqlDB Docker container when it spins up

curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
Enter fullscreen mode Exit fullscreen mode

There’s a wrinkle in the plan here which is that the latest version of the container runs as a non-root user, and sudo is not installed (no sandwiches for me). To hack around this we elevate the container to run as root user in the Docker Compose spec:

  ksqldb:
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    user: root
    …
Enter fullscreen mode Exit fullscreen mode

Now when the container launches it downloads the connector, installs unzip and unzips the connector archive directly into the plugin.path in which Kafka Connect (running embedded in ksqlDB) will look for it.

NOTE: The 'proper' way to do this is either bake your own ksqlDB image with the connector plugin already installed, or to download the connector to the host machine, and mount it into the ksqlDB container. Both of these are fine, but involve more moving parts and stuff to go wrong than a standalone Docker Compose file for my purposes :) |

Running the stack

Spin up the Docker Compose file

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

and then launch ksqlDB - this seemingly complex snippet simply waits for ksqlDB to be available before launching the CLI:

docker exec -it ksqldb bash -c 'echo -e "\n\n Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [$curl_status -eq 200] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
Enter fullscreen mode Exit fullscreen mode

In a separate terminal, once ksqlDB has finished starting (i.e. once the ksqlDB CLI starts from the above command) make sure that the MS SQL connector has installed correctly:

docker exec ksqldb curl -s localhost:8083/connector-plugins
Enter fullscreen mode Exit fullscreen mode

You should see

[{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.2.2.Final"}]
Enter fullscreen mode Exit fullscreen mode

Configuring MS SQL for CDC

When the MS SQL container starts a couple of scripts are run to set up the database for CDC and add some test data. If you’re not using the Docker Compose then you need to run these yourself:

USE [master]
GO
CREATE DATABASE demo;
GO
USE [demo]
EXEC sys.sp_cdc_enable_db
GO

-- Run this to confirm that CDC is now enabled:
SELECT name, is_cdc_enabled FROM sys.databases;
GO

use [demo];

CREATE TABLE demo.dbo.ORDERS ( order_id INT, customer_id INT, order_ts DATE, order_total_usd DECIMAL(5,2), item VARCHAR(50) );
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'ORDERS',
@role_name = NULL,
@supports_net_changes = 0
GO

-- At this point you should get a row returned from this query
SELECT s.name AS Schema_Name, tb.name AS Table_Name , tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc FROM sys.tables tb INNER JOIN sys.schemas s on s.schema_id = tb.schema_id WHERE tb.is_tracked_by_cdc = 1
GO
-- h/t William Prigol Lopes https://stackoverflow.com/a/61698148/350613
Enter fullscreen mode Exit fullscreen mode

Adding a SQL Server connector in ksqlDB

You should now have the ksqlDB prompt open

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.11.0, Server v0.11.0 located at http://ksqldb:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>
Enter fullscreen mode Exit fullscreen mode

From the ksqlDB prompt create the connector:

CREATE SOURCE CONNECTOR SOURCE_MSSQL_ORDERS_01 WITH (
      'connector.class' = 'io.debezium.connector.sqlserver.SqlServerConnector',
      'database.hostname' = 'mssql',
      'database.port' = '1433',
      'database.user' = 'sa',
      'database.password' = 'Admin123',
      'database.dbname' = 'demo',
      'database.server.name' = 'mssql',
      'table.whitelist' = 'dbo.orders',
      'database.history.kafka.bootstrap.servers' = 'broker:29092',
      'database.history.kafka.topic' = 'dbz_dbhistory.mssql.asgard-01',
      'decimal.handling.mode' = 'double'
);
Enter fullscreen mode Exit fullscreen mode

Check that it’s running successfully

SHOW CONNECTORS;

 Connector Name         | Type   | Class                                              | Status
--------------------------------------------------------------------------------------------------------------------
 SOURCE_MSSQL_ORDERS_01 | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
--------------------------------------------------------------------------------------------------------------------
Enter fullscreen mode Exit fullscreen mode

If it’s not (e.g. if the Status is WARNING) then run docker logs -f ksqldb and page through to find the ERROR.

Using MS SQL data in ksqlDB

With the connector running and the data flowing you can declare a stream against it

CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql.dbo.ORDERS', VALUE_FORMAT='AVRO');
Enter fullscreen mode Exit fullscreen mode

and then start to explore the data:

SET 'auto.offset.reset' = 'earliest';

SELECT SOURCE->NAME, SOURCE->SCHEMA + '.' + SOURCE->"TABLE", OP,BEFORE,AFTER FROM ORDERS EMIT CHANGES LIMIT 2;

+-----------------------+-----------------------+-------+---------+-----------------------+
|NAME                   |KSQL_COL_0             |OP     |BEFORE   |AFTER                  |
+-----------------------+-----------------------+-------+---------+-----------------------+
|mssql                  |dbo.ORDERS             |r      |null     |{ORDER_ID=1, CUSTOMER_I|
|                       |                       |       |         |D=7, ORDER_TS=18256, OR|
|                       |                       |       |         |DER_TOTAL_USD=2.1, ITEM|
|                       |                       |       |         |=Proper Job}           |
|mssql                  |dbo.ORDERS             |r      |null     |{ORDER_ID=2, CUSTOMER_I|
|                       |                       |       |         |D=8, ORDER_TS=18236, OR|
|                       |                       |       |         |DER_TOTAL_USD=0.23, ITE|
|                       |                       |       |         |M=Wainwright}          |
Enter fullscreen mode Exit fullscreen mode

Note the use of the operator to access the nested fields.

SELECT AFTER->ORDER_ID, AFTER->CUSTOMER_ID, AFTER->ORDER_TOTAL_USD FROM ORDERS EMIT CHANGES LIMIT 5;

+-------------+--------------+------------------+
|ORDER_ID     |CUSTOMER_ID   |ORDER_TOTAL_USD   |
+-------------+--------------+------------------+
|1            |7             |2.1               |
|2            |8             |0.23              |
|3            |12            |4.3               |
|4            |7             |4.88              |
|5            |14            |3.89              |
Limit Reached
Query terminated
Enter fullscreen mode Exit fullscreen mode

Capturing changes from MS SQL

So far we’ve just seen the snapshot/bootstrap ingest of data from MS SQL into Kafka/ksqlDB. Let’s make some changes in MS SQL and see how they show up in ksqlDB.

Launch the MS SQL CLI

docker exec -it mssql bash -c '/opt/mssql-tools/bin/sqlcmd -l 30 -d demo -S localhost -U sa -P $SA_PASSWORD'
Enter fullscreen mode Exit fullscreen mode

Make some changes to the data

DELETE FROM ORDERS WHERE ORDER_ID=1;
UPDATE ORDERS SET ORDER_TOTAL_USD = ORDER_TOTAL_USD * 0.9 WHERE ORDER_ID =2;
INSERT INTO ORDERS (order_id, customer_id, order_ts, order_total_usd, item) values (9, 5, '2019-11-29T11:10:39Z', '2.24', 'Black Sheep Ale');
GO
Enter fullscreen mode Exit fullscreen mode

Check out the data in ksqlDB

SELECT OP,
       SOURCE->SCHEMA + '.' + SOURCE->"TABLE",
       BEFORE->ORDER_ID AS B_ORDER_ID,
       AFTER->ORDER_ID AS A_ORDER_ID,
       BEFORE->ORDER_TOTAL_USD AS B_ORDER_TOTAL_USD,
       AFTER->ORDER_TOTAL_USD AS A_ORDER_TOTAL_USD,
       BEFORE->ITEM AS B_ITEM,
       AFTER->ITEM AS A_ITEM
  FROM ORDERS
  WHERE NOT OP='r'
  EMIT CHANGES;

+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|OP   |KSQL_COL_0   |B_ORDER_ID  |A_ORDER_ID |B_ORDER_TOTAL_USD  |A_ORDER_TOTAL_USD |B_ITEM               |A_ITEM               |
+-----+-------------+------------+-----------+-------------------+------------------+---------------------+---------------------+
|d    |dbo.ORDERS   |1           |null       |2.1                |null              |Proper Job           |null                 |
|u    |dbo.ORDERS   |2           |2          |0.23               |0.21              |Wainwright           |Wainwright           |
|c    |dbo.ORDERS   |null        |9          |null               |2.24              |null                 |Black Sheep Ale      |
Enter fullscreen mode Exit fullscreen mode

Things to note:

  • The d deletion message gives you access to the row state before it was deleted

  • The u update message gives you the field values before they were updated (ORDER_TOTAL_USD)

  • The c creation message has null values in the BEFORE object because there were no values before the row was created :)

Top comments (0)