DEV Community

Gustavo Gordillo
Gustavo Gordillo

Posted on • Edited on

SQL Server & MongoDB sync (Kafka + Kubernetes + Debezium)

The problem to solve

Some months back, I encountered a scenario where I needed to synchronize two databases: one based on SQL Server and the other on MongoDB. The SQL Server database was the primary source of information derived from the client's business logic, and the MongoDB was the source of information for our proprietary e-commerce sites.

We needed to synchronize some of the client SQL server operations on our Mongo DB.

The problem in pieces

After some research, we found a way to achieve this using an event-driven architecture pattern:

  1. We must capture events from SQL Server (INSERT, DELETE, UPDATES, etc.. )
  2. Stream those events
  3. Consume events and update the MongoDB.

Solution

We used the change data capture (CDC) of SQL Server in combination with Debizium connectors to capture the SQL operations (such as inserts, deletes, updates, etc..) and send them to Apache Kafka, an open-source event streaming platform. And for the consumers part we develop our own using python.

The architecture will look like this:

Debeizum architecture Docs
Image from Debizium documentation

Tutorial

I will explain how to do it using MiniKube (local kubernetes) ideal for testing before production. You probably need to have some basic knowladge of Kubernetes.

The first part in this process is to enable change data capture for the table of your interest (Microsoft Docs)

-- Enable CDC for a table specifying filegroup
USE MyDB
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'MyTable',
    @role_name     = N'MyRole',
    @filegroup_name = N'MyDB_CT',
    @supports_net_changes = 1
GO
Enter fullscreen mode Exit fullscreen mode

Once you enable the CDC, we can start using minikube to deploy locally our cluster.

Deploying architecture in Minikube

To deploy the architecture locally I strongly suggest to follow the steps in the debizium documentation, It's straigh forward and the only thing I could do is rewrite it here, and its not worth it. Insted I will hightligth some key steps to use it with SQl server connector insted of Mysql connector (which is the current example in the docs)

Configuring Kafka connect

In order to connect to SQL Server you need to use the Kafka connect connector: debizium-sqlserver-connnector, you can find it here: https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/

Then use the next kafka-connect-configuration file to create the connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: strimzi-cluster-operator
#  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
#    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.5.1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output: # (2)
      type: docker
      image: {here-go-your-container-repository-registry}
      pushSecret: {your-secret}
    plugins: # (3)
      - name: debezium-sqlserver-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.3.3.Final/debezium-connector-sqlserver-2.3.3.Final-plugin.tar.gz
Enter fullscreen mode Exit fullscreen mode

As you see on (2) in the image field of the yaml file, you need to write a container repository registry pointing to a docker image that will be use to build the kafka-connect container. Example: gustavo/kafka-connect-sql-server:latest, use your own. I use docker hub for this one.

And finally you need to create a kubernetes's secret, you can name it as you wish, for example: secret-kafka.

Here is an example:

kubectl create secret docker-registry secret-kafka -n kafka \
  --docker-email=your-user-email-goes-here \
  --docker-username=your-user-name-goes-here \
  --docker-password=your-password-goes-here
Enter fullscreen mode Exit fullscreen mode

note that in this example we are pointing to the the kafka namespace (-n kafka) if you are following the debizium tutorial, you should use the same namespace.

Creating the Debizium SQL Server Connector

Finally this how your kafka-connection should look like:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-sqlserver
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.sqlserver.SqlServerConnector
  tasksMax: 1
  config:
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    database.hostname: your-sql-server-hostname
    database.port: you-sql-server-port
    database.user: your-sql-server-user
    database.password: your-sql-server-password
    database.dbname: your-sql-server-db
    database.server.name: your-sql-server-name
    topic.prefix: chose-a-prefix
    database.names: database-name
    database.encrypt: true
    database.trustServerCertificate: true
    database.include.list: database-list-comma-separated
    table.include.list: table-list-comma-separated
Enter fullscreen mode Exit fullscreen mode

Look the sql server connector docs for more details.
Example:
table.include.list = dbo.table0, dbo.table1, dbo.table2
topic.prefix = dev-prefix
database.names = SQLENTERPRISE

Consumers

Through this step you should have a running cluster using minikube.

The last step is to make some consumers to capture kafka events. You can refert to confluent documentation to install the confluent kafka client, this will help us interact with the kafka cluster.

Let's create a file that will help us to listen kafka events

decorators.py


from confluent_kafka import Consumer, KafkaError

# Define the Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': '10.10.1.1:9092',  # Replace with your Kafka broker(s) address
    'group.id': 'connect-cluster-id',
    'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
}

# Create a Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to a Kafka topic
consumer.subscribe(['dev-prefix.SQLENTERPRISE.dbo.table1', 'dev-prefix.SQLENTERPRISE.dbo.table2', 'dev-prefix.SQLENTERPRISE.dbo.table3'])  # Replace with the name of the topic you want to consume from
print("Subscribed to topic")

def loop(func):
    def wrapper():
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    continue
                if msg.error():
                    # Handle any errors that occurred during consumption
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        print(f"Reached end of partition for topic {msg.topic()} partition {msg.partition()}")

                    else:                
                        print(f"Error while consuming message: {msg.error()}")
                else:
                    # kafka logic
                    print(f"Consumed message: {msg.value()}")  
                    func(msg)
        except KeyboardInterrupt:
            pass
        finally:
            # Close the Kafka consumer when done
            consumer.close()

    return wrapper
Enter fullscreen mode Exit fullscreen mode

main.py

from util.decorators import loop

import json

@loop
def main(msg):
    key = str(msg.topic()).replace("dev-prefix.SQLENTERPRISE.dbo.", '').lower()

    value = json.loads(msg.value().decode("utf-8"))
    payload = { key: value }

    if msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table0":
        print(payload)
    elif msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table1":
        print(payload)
    elif msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table2":
        print(payload)
    else:
        print("topic not found")


main()

Enter fullscreen mode Exit fullscreen mode

Finally you could replace the print(payload) to a POST call pointing to a microservice that you want to delegate mongoDB updates or you could do it rigth there.

And that it all! 🎉🎉

In the future, I will explain how to deploy it on production using google cloud.

Top comments (0)