DEV Community

Mohammad Arab Anvari
Mohammad Arab Anvari

Posted on • Updated on

Apply CDC From MySQL To Clickhouse on local environment

The aim of this tutorial is to capture every change (delete, insert, and update) from the Mysql table and sync it with Clickhouse.

Prerequisites

  • Mysql
  • Zookeeper
  • Kafka
  • Kafka-Connect
  • Clickhouse

We can set up all of these services with a simple docker-compose file(Source).

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuse
  clickhouse:
    image: clickhouse/clickhouse-server:23.2.4.12
    links:
     - kafka
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    ports:
      - 8123:8123
      - 9000:9000
Enter fullscreen mode Exit fullscreen mode

You can read more about the options of every service in this tutorial.
After saving the yaml file as docker-compose.yml :

export DEBEZIUM_VERSION=2.2
docker compose up
Enter fullscreen mode Exit fullscreen mode

Now we have a Mysql container which contains a simple database named inventory, a Kafka container, and Zookeeper which manages a Kafka cluster, connect instance which adds abilities of Kafka-Connectors to Kafka and also a Clickhouse instance. Now we have all perquisites.

debezium, clickhouse to mysql architecture
Image Source

Deploy Debezium connector

We can interact with Kafka-Connect with Rest API.
Base request :

curl -i -X {Request_Type} -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ 
Enter fullscreen mode Exit fullscreen mode

See current connectors :

curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
Enter fullscreen mode Exit fullscreen mode

Delete {my-conn} connector:

curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/{my-conn}
Enter fullscreen mode Exit fullscreen mode

Add connector:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector-config-as-json}'
Enter fullscreen mode Exit fullscreen mode

Config for MySQL Connector

{
    "name": "mysql-connector",
    "config": {
    "tasks.max": "1",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "debezium",
    "database.include.list": "inventory",
    "table.include.list": "inventory.orders",
    "database.server.id": "1",
    "message.key.columns": "inventory.orders:order_number",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "dbz.inventory.history",
    "snapshot.mode": "schema_only",
    "topic.prefix": "dbz.inventory.v2",
    "transforms": "unwrap",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}
Enter fullscreen mode Exit fullscreen mode
  • name: The name of the connector.
  • config: The connector’s configuration.
  • tasks.max: Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.
  • connector.class: Type of connector, On of These
  • database.hostname: The database host, which is the name of the Docker container running the MySQL server (mysql). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, you would specify the IP address or resolvable hostname for this value.
  • database.user & database.password: Username and password of mysql user with these privileges. For this example, I use the root user and pass.
  • database.include.list: Only changes in the inventory database will be detected.
  • topic.prefix: A unique topic prefix. This name will be used as the prefix for all Kafka topics.
  • schema.history.internal.kafka.bootstrap.servers & schema.history.internal.kafka.topic: The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the binlog when the connector should begin reading.
  • transforms*: These transformations are needed to insert data in Clickhouse. More explanation here

Full reference of configs for MySQL connector can be found here.

Consume Messages From Kafka

We wanna see a list of topics in our Kafka broker. First, we should access bash inside the Kafka container :

docker exec -it {kafka-container-name} /bin/bash
Enter fullscreen mode Exit fullscreen mode

Then:

/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
Enter fullscreen mode Exit fullscreen mode

Note that the topic corresponding to our orders table in MySQL has such format: {topic.prefix}.{database_name}.{table_name}. In this example, it turns to dbz.inventory.v2.inventory.orders
To consume all messages from a topic:

 /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbz.inventory.v2.inventory.orders --from-beginning
Enter fullscreen mode Exit fullscreen mode

Set Up Clickhouse Tables

As mentioned in this article in Clickhouse doc, we need 3 tables:

  • A Table witch Kafka engine
  • A Materialized View table
  • A MergeTree table

Kafka Engine Table

As mentioned in the doc we should specify the format of message arriving from Kafka topic (one of these), We can use [[Kafka Schema Registry]] but here we wanna parse Json directly, So with help of solution provided in this post we get message as JSONString format and then parse it using Mat. View.

CREATE TABLE `default`.kafka_orders
(
`msg_json_str` String
)
Engine=Kafka('kafka:9092', 'dbz.inventory.v2.inventory.orders', 'clickhouse', 'JSONAsString')
Enter fullscreen mode Exit fullscreen mode

Full doc of Kafka engine in Clickhouse.

MergeTree Table

As mentioned at the first of this article we wanna capture delete and update so we use ReplacingMergeTree:

CREATE TABLE default.stream_orders
(
`order_number` Int16,
`order_date` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16,
`__deleted` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY (order_number)
SETTINGS index_granularity = 8192
Enter fullscreen mode Exit fullscreen mode

Mat. View

We parse Json using JSONExtract functions in Clickhouse.
We should consider that Debezium treats DATE data type as a number of days since the 1970-01-01 Source. It's the cause of using toDate with combination of JSONExtractInt.

CREATE MATERIALIZED VIEW default.consumer__orders TO default.stream_orders
(
`order_number` Int16,
`order_date` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16,
`__deleted` Nullable(String)
) AS
SELECT
JSONExtractInt(msg_json_str,'payload','order_number') AS order_number,
(toDate('1970-01-01')+JSONExtractInt(msg_json_str,'payload','order_date')) AS order_date,
JSONExtractInt(msg_json_str,'payload','purchaser') AS purchaser,
JSONExtractInt(msg_json_str,'payload','quantity') AS quantity,
JSONExtractInt(msg_json_str,'payload','product_id') as product_id,
JSONExtractString(msg_json_str,'payload','__deleted') AS __deleted
FROM default.kafka_orders
Enter fullscreen mode Exit fullscreen mode

A View (Optional)

Clickhouse will merge consumer__orders table in an irregular schedule so we can't see the latest version of data at all times. But we can use view to obtain this goal:

CREATE VIEW orders(
`order_number` Int16,
`order_date_` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16
) AS
SELECT
order_number,
max(order_date) as order_date_,
argMax(purchaser,order_date) as purchaser,
argMax(quantity,order_date) as quantity,
argMax(product_id,order_date) as product_id
FROM default.stream_orders
WHERE `__deleted`= 'false'
GROUP BY order_number
Enter fullscreen mode Exit fullscreen mode

We can also use FINAL modified instead of GROUP BY but it's not recommended in a production environment.

Troubleshooting

In case of any error or even lack of data in tables, we should check Clickhouse server logs located in /var/log/clickhouse-server/clickhouse-server.err.log

References

Top comments (0)