DEV Community

Cover image for Part 3: Transforming MongoDB CDC Event Messages
avital trifsik for Memphis.dev

Posted on • Updated on • Originally published at memphis.dev

Part 3: Transforming MongoDB CDC Event Messages

This is part three of a series of blog posts on building a modern event-driven system using Memphis.dev.

In our last blog post, we introduced a reference implementation for capturing change data capture (CDC) events from a MongoDB database using Debezium Server and Memphis.dev. At the end of the post we noted that MongoDB records are serialized as strings in Debezium CDC messages like so:

{
    "schema" : ...,

"payload" : {
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",

...
}
}
Enter fullscreen mode Exit fullscreen mode

We want to use the Schemaverse functionality of Memphis.dev to check messages against an expected schema. Messages that don’t match the schema are routed to a dead letter station so that they don’t impact downstream consumers. If this all sounds like ancient Greek, don’t worry! We’ll explain the details in our next blog post.

To use functionality like Schemaverse, we need to deserialize the MongoDB records as JSON documents. In this blog post, we describe a modification to our MongoDB CDC pipeline that adds a transformer service to deserialize the MongoDB records to JSON documents.


Overview of the Solution

The previous solution consisted of six components:

  1. Todo Item Generator: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds. Each todo item contains a description, creation timestamp, optional due date, and completion status.

  2. MongoDB: Configured with a single database containing a single collection (todo_items).

  3. Debezium Server: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.

  4. Memphis.dev REST Gateway: Uses the out-of-the-box configuration.

  5. Memphis.dev: Configured with a single station (todo-cdc-events) and single user (todocdcservice).

  6. Printing Consumer: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.

mongocdcd example

In this iteration, we are adding two additional components:

  • Transformer Service: A transformer service that consumes messages from the todo-cdc-events station, deserializes the MongoDB records, and pushes them to the cleaned-todo-cdc-events station.

  • Cleaned Printing Consumer: A second instance of the printing consumer that prints messages pushed to the cleaned-todo-cdc-events station.

Our updated architecture looks like this:

data flow diagram


A Deep Dive Into the Transformer Service

Skeleton of the Message Transformer Service

Our transformer service uses the Memphis.dev Python SDK. Let’s walk through the transformer implementation. The main() method of our transformer first connects to the Memphis.dev broker. The connection details are grabbed from environmental variables. The host, username, password, input station name, and output station name are passed using environmental variables in accordance with suggestions from the Twelve-Factor App manifesto.

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])
Enter fullscreen mode Exit fullscreen mode

Once a connection is established, we create consumer and producer objects. In Memphis.dev, consumers and producers have names. These names appear in the Memphis.dev UI, offering transparency into the system operations.

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")
Enter fullscreen mode Exit fullscreen mode

The consumer API uses the callback function design pattern. When messages are pulled from the broker, the provided function is called with a list of messages as its argument.

  print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)
Enter fullscreen mode Exit fullscreen mode

After setting up the callback, we kick off the asyncio event loop. At this point, the transformer service pauses and waits until messages are available to pull from the broker.

Keep your main thread alive so the consumer will keep receiving data

await asyncio.Event().wait()


Creating the Message Handler Function

The create function for the message handler takes a producer object and returns a callback function. Since the callback function only takes a single argument, we use the closure pattern to implicitly pass the producer to the msg_handler function when we create it.

The msg_handler function is passed three arguments when called: a list of messages, an error (if one occurred), and a context consisting of a dictionary. Our handler loops over the messages, calls the transform function on each, sends the messages to the second station using the producer, and acknowledges that the message has been processed. In Memphis.dev, messages are not marked off as delivered until the consumer acknowledges them. This prevents messages from being dropped if an error occurs during processing.

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler
Enter fullscreen mode Exit fullscreen mode

The Message Transformer Function

Now, we get to the meat of the service: the message transformer function. Message payloads (returned by the get_data() method) are stored as bytearray objects. We use the Python json library to deserialize the messages into a hierarchy of Python collections (list and dict) and primitive types (int, float, str, and None).

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)
Enter fullscreen mode Exit fullscreen mode

We expect the object to have a payload property with an object as the value. That object then has two properties (“before” and “after”) which are either None or strings containing serialized JSON objects. We use the JSON library again to deserialize and replace the strings with the objects.

 if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)
Enter fullscreen mode Exit fullscreen mode

Lastly, we reserialize the entire JSON record and convert it back into a bytearray for transmission to the broker.

  output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

Enter fullscreen mode Exit fullscreen mode

Hooray! Our objects now look like so:

{
"schema" : ...,

"payload" : {
"before" : null,

"after" : {
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}
Enter fullscreen mode Exit fullscreen mode

Running the Transformer Service

If you followed the 7 steps in the previous blog post, you only need to run three additional steps. to start the transformer service and verify that its working:

Step 8: Start the Transformer Service

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s
Enter fullscreen mode Exit fullscreen mode

Step 9: Start the Second Printing Consumer

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s
Enter fullscreen mode Exit fullscreen mode

Step 10: Check the Memphis UI

When the transformer starts producing messages to Memphis.dev, a second station named "cleaned-todo-cdc-events" will be created. You should see this new station on the Station Overview page in the Memphis.dev UI like so:

Check memphis ui

The details page for the "cleaned-todo-cdc-events" page should show the transformer attached as a producer, the printing consumer, and the transformed messages:

Image description

Congratulations! We’re now ready to tackle validating messages using Schemaverse in our next blog post. Subscribe to our newsletter to stay tuned!

Head over to Part 4: Validating CDC Messages with Schemaverse to learn further


In case you missed parts 1 & 2:

Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev

Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events


Originally published at Memphis.dev By RJ Nowling, Developer advocate at Memphis.dev


Follow Us to get the latest updates!
GithubDocsDiscord

Top comments (0)