DEV Community

Cover image for MySQL to DynamoDB: Build a streaming data pipeline on AWS using Kafka
Abhishek Gupta for AWS

Posted on • Originally published at dzone.com

MySQL to DynamoDB: Build a streaming data pipeline on AWS using Kafka

Use change data capture with MSK Connect to sync data between Aurora MySQL and DynamoDB

This is the second part of the blog series which provides a step-by-step walkthrough of data pipelines with Kafka and Kafka Connect. I will be using AWS for demonstration purposes, but the concepts apply to any equivalent options (e.g. running these locally in Docker).

This part will show Change Data Capture in action that let's you track row-level changes in database tables in response to create, update and delete operations. For example, in MySQL, these change data events are exposed via the MySQL binary log (binlog).

In Part 1, we used the Datagen connector in the source part of the data pipeline - it helped us generate mock data to MSK topic and keep things simple. We will use Aurora MySQL as the source of data and leverage it's Change Data Capture capability with the Debezium connector for MySQL to extract data in real-time from tables in Aurora MySQL, push that to MSK topics. Then, we will continue to use the DynamoDB sink connector just like we did before.

If you're new to Debezium...

It is a distributed platform that builds on top of Change Data Capture features available in different databases. It provides a set of Kafka Connect connectors which tap into row-level changes (using CDC) in database table(s) and convert them into event streams. These are sent to Kafka and can be made available to all the downstream applications.


Here is a high level diagram of the solution presented in this blog post.

Image description

I am assuming that you are following along from Part 1 where the creation process for the base infrastructure and services required for this tutorial were already covered. If you haven't already, refer to the Prepare infrastructure components and services section in part 1 section

Data pipeline part 1: Aurora MySQL to MSK

Let's start by creating the first half of the pipeline to synchronise data from Aurora MySQL table to a topic in MSK.

In this section, you will:

  • Download the Debezium connector artefacts
  • Create Custom Plugin in MSK
  • Deploy the Debezium source connector to MSK Connect

At the end, you will have the first half of the data pipeline ready to go!

Create a Custom plugin and Connector

Upload the Debezium connector to Amazon S3

Log into the Kafka client EC2 instance and run these commands:

sudo -u ec2-user -i
mkdir debezium && cd debezium

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.0.Final/debezium-connector-mysql-1.9.0.Final-plugin.tar.gz
tar xzf debezium-connector-mysql-1.9.0.Final-plugin.tar.gz

cd debezium-connector-mysql
zip -9 ../debezium-connector-mysql-1.9.0.Final-plugin.zip *

cd ..
aws s3 cp ./debezium-connector-mysql-1.9.0.Final-plugin.zip s3://msk-lab-<ENTER_YOUR_AWS_ACCOUNT_ID>-plugins-bucket/
Enter fullscreen mode Exit fullscreen mode

Create Custom Plugin

For step by step instructions on how to create a MSK Connect Plugin, refer to Creating a custom plugin using the AWS Management Console in the official documentation.

While creating the Custom Plugin, make sure to choose the Debezium connector zip file you uploaded to S3 in the previous step.

Image description

Create the Debezium source connector

For step by step instructions on how to create a MSK Connect Connector, refer to Creating a connector in the official documentation.

To create a connector:

  1. Choose the plugin you just created.
  2. Enter the connector name and choose the MSK cluster along with IAM authentication
  3. You can enter the content provided below in the connector configuration section. Make sure you replace the following configuration as per your setup:
  • database.history.kafka.bootstrap.servers - Enter the MSK cluster endpoint
  • database.hostname - Enter Aurora RDS MySQL Endpoint

Leave the rest of configuration unchanged

connector.class=io.debezium.connector.mysql.MySqlConnector
database.user=master
database.server.id=123456
tasks.max=1
database.history.kafka.topic=dbhistory.salesdb
database.history.kafka.bootstrap.servers=<ENTER MSK CLUSTER ENDPOINT>
database.server.name=salesdb
database.port=3306
include.schema.changes=true
database.hostname=<ENTER RDS MySQL ENDPOINT>
database.password=S3cretPwd99
database.include.list=salesdb
value.converter.schemas.enable=false
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
Enter fullscreen mode Exit fullscreen mode
  1. Under Access permissions, choose the correct IAM role (the one with AuroraConnectorIAMRole in its name) for the connector
  2. Click Next to move to the Security options - leave them unchanged
  3. Click Next. For Log delivery, choose Deliver to Amazon CloudWatch Logs. Locate and select /msk-connect-demo-cwlog-group
  4. Click Next - On the final page, scroll down and click Create connector to start the process and wait for the connector to start.

Once that's done and the connector has transitioned to Running state, proceed with the below steps.

Test the pipeline

We want to confirm whether records from the SALES_ORDER table in the salesdb database have been pushed to MSK topic. To do that, from the EC2 host, run the Kafka CLI consumer.

Note the topic name salesdb.salesdb.SALES_ORDER - this is as per Debezium convention

sudo -u ec2-user -i

export MSK_BOOTSTRAP_ADDRESS=<ENTER MSK CLUSTER ENDPOINT>

/home/ec2-user/kafka/bin/kafka-console-consumer.sh --bootstrap-server $MSK_BOOTSTRAP_ADDRESS --consumer.config /home/ec2-user/kafka/config/client-config.properties --from-beginning --topic salesdb.salesdb.SALES_ORDER | jq --color-output .
Enter fullscreen mode Exit fullscreen mode

In another terminal, use MySQL client and connect to the Aurora database and insert few records:

sudo -u ec2-user -i

export RDS_AURORA_ENDPOINT=<ENTER RDS MySQL ENDPOINT>

mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password=S3cretPwd99

USE salesdb;

select * from SALES_ORDER limit 5;

INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29001, 2568, now(), 'STANDARD');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29002, 1649, now(), 'ONE-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29003, 3861, now(), 'TWO-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29004, 2568, now(), 'STANDARD');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29005, 1649, now(), 'ONE-DAY');
INSERT INTO SALES_ORDER (ORDER_ID, SITE_ID, ORDER_DATE, SHIP_MODE) VALUES (29006, 3861, now(), 'TWO-DAY');
Enter fullscreen mode Exit fullscreen mode

If everything is setup correctly, you should see the records in the consumer terminal.

{
  "ORDER_ID": 29001,
  "SITE_ID": 2568,
  "ORDER_DATE": 1655279536000,
  "SHIP_MODE": "STANDARD"
}
{
  "ORDER_ID": 29002,
  "SITE_ID": 1649,
  "ORDER_DATE": 1655279536000,
  "SHIP_MODE": "ONE-DAY"
}
{
  "ORDER_ID": 29003,
  "SITE_ID": 3861,
  "ORDER_DATE": 1655279563000,
  "SHIP_MODE": "TWO-DAY"
}
...
Enter fullscreen mode Exit fullscreen mode

The secret to compact change event payloads

Notice how compact the change data capture event payload is. This is because we configured the connector to use io.debezium.transforms.ExtractNewRecordState which is a Kafka Single Message Transform (SMT). By default Debezium change event structure is quite complex - along with the change event, it also includes metadata such as schema, source database info etc. It looks something like this:

{
  "before": null,
  "after": {
    "ORDER_ID": 29003,
    "SITE_ID": 3861,
    "ORDER_DATE": 1655279563000,
    "SHIP_MODE": "TWO-DAY"
  },
  "source": {
    "version": "1.9.0.Final",
    "connector": "mysql",
    "name": "salesdb",
    "ts_ms": 1634569283000,
    "snapshot": "false",
    "db": "salesdb",
    "sequence": null,
    "table": "SALES_ORDER",
    "server_id": 1733046080,
    "gtid": null,
    "file": "mysql-bin-changelog.000003",
    "pos": 43275145,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",
  "ts_ms": 1655279563000,
  "transaction": null
...
Enter fullscreen mode Exit fullscreen mode

Thanks to the Kafka SMT (specified using transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState), we can effectively flatten the event payload and customize it as per our requirements.

For details, refer to New Record State Extraction in the Debezium documentation.


Data pipeline part 2: MSK to DynamoDB

We can now shift our focus to the second half of the pipeline that's responsible for taking data from MSK topic to DynamoDB table with the help of the DynamoDB Sink connector.

If the DynamoDB table is not present, the connector automatically creates one for you, but it uses default settings i.e. its creates a table in Provisioned Mode, with 10 read capacity units (RCUs) and 10 write capacity units (WCUs).

Image description

But your use-case might need a configuration. For example, in order to handle high volume of data, you may want to configure Auto scaling, or even better, activate On-Demand mode for your table.

That's exactly what we will do.

Before you proceed, create a DynamoDB table

Use the following settings:

  • Table name - kafka_salesdb.salesdb.SALES_ORDER (do not change the table name)
  • Partition key - ORDER_ID (Number)
  • Range key - SITE_ID (Number)
  • Capacity mode - On-demand

Image description

That's it, you're good to go!

Create a Custom plugin and Connector

For step by step instructions on how to create a MSK Connect Plugin, refer to Creating a custom plugin using the AWS Management Console in the official documentation.

While creating the Custom Plugin, make sure to choose the DynamoDB connector zip file you uploaded to S3 in the previous step.

For step by step instructions on how to create a MSK Connect Connector, refer to Creating a connector in the official documentation.

To create a connector:

  1. Choose the plugin you just created.
  2. Enter the connector name and choose the MSK cluster along with IAM authentication
  3. You can enter the content provided below in the connector configuration section. Make sure you replace the following configuration as per your setup:
  • Use the right topic name for topics attribute (we are using salesdb.salesdb.SALES_ORDER in this example, since that's the topic name format that Debezium source connector adopts)
  • For confluent.topic.bootstrap.servers, enter MSK cluster endpoint
  • For aws.dynamodb.endpoint and aws.dynamodb.region, enter the region where you created the DynamoDB table e.g. us-east-1

Leave the rest of configuration unchanged

connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
tasks.max=2
aws.dynamodb.region=<ENTER AWS REGION e.g. us-east-1>
aws.dynamodb.endpoint=https://dynamodb.<ENTER AWS REGION>.amazonaws.com
topics=salesdb.salesdb.SALES_ORDER
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
table.name.format=kafka_${topic}
confluent.topic.bootstrap.servers=<ENTER MSK CLUSTER ENDPOINT>
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.mechanism=AWS_MSK_IAM
confluent.topic.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
aws.dynamodb.pk.hash=value.ORDER_ID
aws.dynamodb.pk.sort=value.SITE_ID
Enter fullscreen mode Exit fullscreen mode
  1. Under Access permissions, choose the correct IAM role (the one with DynamoDBConnectorIAMRole in its name) for the connector
  2. Click Next to move to the Security options - leave them unchanged
  3. Click Next. For Log delivery, choose Deliver to Amazon CloudWatch Logs. Locate and select /msk-connect-demo-cwlog-group
  4. Click Next - On the final page, scroll down and click Create connector to start the process and wait for the connector to start.

Once that's done and the connector has transitioned to Running state, proceed with the below steps.

Choosing DynamoDB primary key

In the above configuration we set aws.dynamodb.pk.hash and aws.dynamodb.pk.sort to value.ORDER_ID and value.SITE_ID respectively. This implies that the ORDER_ID field from the Kafka topic event payload will be used as the partition key and the value for SITE_ID will we designated as the Range key (depending on your requirements, you can also leave aws.dynamodb.pk.sort empty).

Test the end to end pipeline

As part of the initial load process, the connector makes sure that all the existing records from Kafka topic are persisted to the DynamoDB table specified in the connector configuration. In this case, you should see more than 29000 records (as per SALES_ORDER table) in DynamoDB and you can run queries to explore the data.

Image description

To continue testing the end to end pipeline, you can insert more data in the SALES_ORDER table and confirm that they were synchronised to Kafka via the Debezium source connector and all the way to DynamoDB, thanks to the sink connector.


Delete resources

Once you're done, delete the resources that you had created.

  • Delete the contents of the S3 bucket (msk-lab-<YOUR ACCOUNT_ID>-plugins-bucket)
  • Delete the CloudFormation stack
  • Delete the DynamoDB table
  • Delete the MSK Connect connectors, Plugins and Custom configuration

Conclusion & wrap up

Change Data Capture is a powerful tool, but we need a way to tap into these event logs and make it available to other services which depend on that data. In this part, you saw how we can leverage this capability to setup a streaming data pipeline between MySQL and DynamoDB using Kafka Connect.

This wraps up this series. Happy building!

Discussion (0)