DEV Community

Scott Wang
Scott Wang

Posted on

MySQL 8 Kafka Connect Tutorial on Docker

In this tutorial, we will use docker-compose, MySQL 8 as examples to demonstrate Kafka Connector by using MySQL as the data source.

This tutorial is mainly based on the tutorial written on Kafka Connect Tutorial on Docker. However, the original tutorial is out-dated that it just won’t work if you followed it step by step. This is a refreshment of that tutorial, also to simplify things, we will get rid of the Avro set up as it serves no purpose to demonstrate Kafka Connector. Enough said, let’s begins.

Preparation

First, you will need to download MYSQL Connector Driver, this can be found
MySQL Connector Driver

You will also need to download the JDBC plugins at
Confluent JDBC plugins

Unzip both mysql-connector-java-8.0.22.tar.gz and confluentinc-kafka-connect-jdbc-10.0–2.1.zip. Create a jars directory, move mysql-connector-java-8.0.22.jar and all the .jar files in onfluentinc-kafka-connect-jdbc-10.0–2.1/lib/ directory to the jars directory.

docker-compose file

Here is the docker-compose file that contains everything you need to run this tutorial

version: '2'
services:
  mysql:
    privileged: true
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: test
    image: mysql:8.0

  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    privileged: true
    ports:
      - 32181:32181
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:5.0.0
    ports:
      - 29092:29092
    links:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-connector-mysql:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - 28083:28083
    links:
      - kafka
      - zookeeper
      - mysql
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 28083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - $PWD/jars:/etc/kafka-connect/jars
Enter fullscreen mode Exit fullscreen mode

Before you run the docker-compose, make sure your file structure looks like
Alt Text

Let the fun begins

Let the fun begins with starting mysql service

docker-compose up -d mysql
docker exec -it kafka-connector_mysql_1 bash
Enter fullscreen mode Exit fullscreen mode

Execute the following queries by using MySQL cli mysql -uroot -ptest

CREATE DATABASE IF NOT EXISTS connect_test;
USE connect_test;

CREATE TABLE IF NOT EXISTS test (
  id serial NOT NULL PRIMARY KEY,
  name varchar(100),
  email varchar(200),
  department varchar(200),
  modified timestamp default CURRENT_TIMESTAMP NOT NULL,
  INDEX `modified_index` (`modified`)
);

INSERT INTO test (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
exit;
Enter fullscreen mode Exit fullscreen mode

Let’s start zookeeper and kafka

docker-compose up -d zookeeper kafka 
Enter fullscreen mode Exit fullscreen mode

wait for few seconds, to make sure that all services are up and running.

docker ps
Enter fullscreen mode Exit fullscreen mode

Once the zookeeper, Kafka and mysql are all up and running, let’s prepare our final course, confluent kafka-connect
First, let’s create the topics that will be used by the connector

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-offsets --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-config --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-status --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact
Enter fullscreen mode Exit fullscreen mode

Finally, let’s start the kafka-connect

docker-compose up -d kafka-connector-mysql
Enter fullscreen mode Exit fullscreen mode

Give it few seconds, and let’s create the JDBC Source connector by making a REST API call to the kafka connector service.

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "quickstart-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/connect_test", "connection.user": "root", "connection.password": "test", "mode": "incrementing", "incrementing.column.name": "id", "timestamp.column.name": "modified", "topic.prefix": "quickstart-jdbc-", "poll.interval.ms": 1000 } }' \
  http://localhost:28083/connectors
Enter fullscreen mode Exit fullscreen mode

wait for few seconds, and check the status to make sure it is RUNNING

curl -s -X GET http://localhost:28083/connectors/quickstart-jdbc-source/status
Enter fullscreen mode Exit fullscreen mode

You should see something similar to

{"name":"quickstart-jdbc-source","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:28083"}],"type":"source"}%
Enter fullscreen mode Exit fullscreen mode

That’s yet, let’s verify our work by running a Kafka consumer,

docker-compose run --rm kafka kafka-console-consumer --bootstrap-server kafka:29092  --topic quickstart-jdbc-test --from-beginning
Enter fullscreen mode Exit fullscreen mode

and you should see the following logs show up

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":18,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":19,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":20,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}}
^CProcessed a total of 10 messages
Enter fullscreen mode Exit fullscreen mode

That’s yet. To tear it all down, simply run

docker-compose down
Enter fullscreen mode Exit fullscreen mode

Enjoy.

Discussion (1)

Collapse
nhatdiec profile image
nhatdiec • Edited

Hi Scott Wang,
I'm newbie on kafka, I followed this article and have error as image below. Am I missing any files?
dev-to-uploads.s3.amazonaws.com/up...