DEV Community

Abhishek Gupta for Microsoft Azure

Posted on • Updated on

Tutorial: How to use MongoDB Change Streams [Part 1]

This blog post demonstrates how to use Change Streams in MongoDB with the official Go driver. I will be using Azure Cosmos DB since it has wire protocol support for the MongoDB API (server version 3.6) which includes Change Streams as well.

Like some of my other blogs, I am going to split it into two parts, just to make it easier to digest the material. Part 1 (this post) will provide an introduction, overview of the Change streams processor service and walk you through how to run the application so that you can witness Changes streams in action.

In part 2, I will go over the code and explain how things work behind the scenes.

The code is available on GitHub

Over the course of these blogs, with the help of a practical application, you will learn about:

  • Change Streams and related concepts
  • Understand the application and how it uses the Change Streams APIs
  • Setup Azure Cosmos DB and work through an end to end exmaple
  • Get to know some of the corner cases/limitations/constraints you should be aware of

What are Change streams?

MongoDB change streams feature provides applications instant access to data changes (create, update, deletes). They can react to these changes by subscribing to them at a global (deployment), database or collection scope. This can be used to for a variety of solutions ranging from traditional ETL jobs to CQRS (Command and Query Responsibility Segregation) based architecture, real-time stream processing, cache invalidation and much more!

This abstracts developers from the complexity of using the MongoDB oplog

How are they useful?

One of the many ways you can leverage Change streams is to build a custom solution which can listen to MongoDB database change events and push them to a scalable data ingestion platform such as Azure Event Hubs for Kafka. You can then build traditional Kafka client applications (Consumer, Streams API) or leverage Serverless processing with Azure Functions (smiliar to this one)

I might blog about other scenarios and their respective solutions as follow-up posts.

At the time of writing, Azure Cosmos DB supports change stream integration with Azure Functions for its SQL API

Quick overview of the Change Processor Service

The application is a change processor service that uses the Change stream feature. It's a Go application that uses the official MongoDB Go driver but the concepts should be applicable to any other language whose native driver supports Change Streams.

It uses the Watch API to subscribe to the change events feed in a specific Collection so that it is notified of documents being created, updated and deleted. It extracts the relevant information from the change event payload i.e. the document which was affected and saves it locally to a file. It also demonstrates how to use Resume Tokens to save processing progress.

Although this application simply saves the change events to a local file (for the sake of simplicity), as mentioned before, you can obviously do much more with this!

Resume tokens are really important...

A Change stream is a potentially infinite series of records. Change stream Resume tokens allow you to "continue processing from where you left off" - think of its as checkpointing

It is very similar to the offset concept in systems such as Apache Kafka.

If the processing application stops (or crashes), it might not be desirable to miss the database changes which happened during this period. You can use the token (details in the next section) to ensure that the application starts off from where it left and is able to detect change events during the time period for which it was not operating. In addition to this, if you have a history/changelog of resume tokens, you have the flexibility of choosing from any of these (its like walking back in time) and (re)process data from that point in time.

There are a few things you should know

Please note the following cases where the Change streams behaviour differs from standard MongoDB:

Let's try out the application to understand Change streams better!

Setup Azure Cosmos DB

Pre-requisites

You need to create an Azure Cosmos DB account with the MongoDB API support enabled along with a Database and Collection. Follow these steps to set up Azure Cosmos DB using the Azure portal:

Learn more about how to Work with databases, containers, and items in Azure Cosmos DB

If you want to use the Azure CLI or Cloud Shell, here is the sequence of commands which you need to execute:

Create an Azure Cosmos DB account (notice --kind MongoDB)

az cosmosdb create --resource-group <RESOURCE_GROUP> --name <COSMOS_DB_NAME> --kind MongoDB
Enter fullscreen mode Exit fullscreen mode

Create the database

az cosmosdb mongodb database create --account-name <COSMOS_DB_ACCOUN> --name <COSMOS_DB_NAME> --resource-group <RESOURCE_GROUP>
Enter fullscreen mode Exit fullscreen mode

Finally, create a collection within the database

az cosmosdb mongo collection create --account-name <COSMOS_DB_ACCOUNT> --database-name <COSMOS_DB_NAME> --name <COSMOS_COLLECTION_NAME> --resource-group-name <RESOURCE_GROUP> --shard <SHARDING_KEY_PATH>
Enter fullscreen mode Exit fullscreen mode

Get the connection string and save it. You will be using it later

az cosmosdb list-connection-strings --name <COSMOS_DB_ACCOUNT> --resource-group <RESOURCE_GROUP> -o tsv --query connectionStrings[0].connectionString
Enter fullscreen mode Exit fullscreen mode

Try out the Change processor service

All you need to do is clone the GitHub repo, build and run the service

Start the service

git clone https://github.com/abhirockzz/mongodb-changestreams-processor
go build -o change-processor
Enter fullscreen mode Exit fullscreen mode

Before you run the app, export environment variables:

export MONGODB_URI=[enter the Azure Cosmos DB connection string]
export MONGODB_DATABASE=[name of the Azure Cosmos DB database you created]
export MONGODB_COLLECTION=[name of the Azure Cosmos DB collection you created]
export WITH_RESUME=[use false if you don't want to use resume tokens. this is true by default]
Enter fullscreen mode Exit fullscreen mode

e.g.

export MONGODB_URI="mongodb://ny-mongodb:<primary access key for cosmosdb>@my-mongodb.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@my-mongodb@"
export MONGODB_DATABASE=test_db
export MONGODB_COLLECTION=test_collection
export WITH_RESUME=false
Enter fullscreen mode Exit fullscreen mode

please ensure you have "" (double quotes) around MONGODB_URI

Start the program

./change-processor
Enter fullscreen mode Exit fullscreen mode

The program up and running with the following output: started change stream...

Create a few records

There are many ways of connecting to Azure Cosmos DB to perform database operation. You can obviously do it programmatically using your favorite language support for MongoDB, but for the purpose of testing this service, try the following options:

Once you create a record, you should see the following output in the program terminal:

saved change event to file: 'change_events'
Enter fullscreen mode Exit fullscreen mode

You should see the change_events file and it will have the document which was created. For e.g. the following record created in Azure Cosmos DB....

{
    "name" : "foo55",
    "email" : "foo55@bar.com",
    "status" : "offline"
}
Enter fullscreen mode Exit fullscreen mode

.... will be saved with an _id (MongoDB Object ID ) in change_events file:

{
  "_id": {
    "$oid": "5e8eec7712f1891a100bd449"
  },
  "name": "foo55",
  "email": "foo55@bar.com",
  "status": "online"
}
Enter fullscreen mode Exit fullscreen mode

This is just a part of the change event - the complete payload looks like this:

The top-level _id field of the change stream event document act as the resume token (more on this below)

{
  "_id": {
    "_data": {
      "$binary": {
        "base64": "W3sidG9rZW4iOiJcIjY5XCIiLCJyYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiJ9fV0=",
        "subType": "00"
      }
    }
  },
  "fullDocument": {
    "_id": {
      "$oid": "5e8eec7712f1891a100bd449"
    },
    "name": "foo56",
    "email": "foo56@bar.com",
    "status": "online"
  },
  "ns": {
    "db": "test_db1",
    "coll": "test_coll1"
  },
  "documentKey": {
    "name": "foo56",
    "_id": {
      "$oid": "5e8eec7712f1891a100bd449"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Update records

For e.g. if I were to change the above record from "status":"online" to "status":"offline", this will be recorded in the change_events along with the updated document

Checkpointing with Resume token

To try this out, stop your application (press ctrl+c). You should see the following output before the program terminates

^Cexit signalled. cancelling context
saved token to file
Enter fullscreen mode Exit fullscreen mode

Check for a file named token. It's contents don't matter - just understand that it is there and it contains a resume token in binary form.

Now, create a few records while the change stream processor is not running. Once you have done that, restart the processor and you should see that the records you added were detected, processed and saved in the change_events file.

Delete records

Delete operation is not yet supported. One of the ways to handle deletes as suggested here is to treat is as an update operation, use an additional attribute (e.g. deleted and set it to true) and set a TTL (time-to-live) for the specific document - this way, you can get the change event as a part of the change stream

If I were to try this with the sample record, there should be an entry in change_events file as such:

{
  "_id": {
    "$oid": "5e8eec7712f1891a100bd449"
  },
  "name": "foo55",
  "email": "foo55@bar.com",
  "status": "offline",
  "deleted": true
}
Enter fullscreen mode Exit fullscreen mode

Notice "deleted": true

Alright! That's all for now. I will cover some of the implementation details in a follow up blog post (Part 2)

Top comments (0)