DEV Community

Cover image for MongoDB Change Streams
Ramy Farid
Ramy Farid

Posted on

MongoDB Change Streams

In this post I am going to talk about a very powerful feature in MongoDB called "Change Streams".

I first discovered change streams of MongoDB when I was developing a node.js web service that required an operation to be done on newly inserted documents once they are in the database, without the need of constant polling or manual intervention.

In other words, I wanted to watch events happening on a specific collection, like when a document is inserted or updated.

What to notice in this case is that the service trying to watch the changes is not the same as the service/operation causing the change to happen, and this is the power of this feature; it's like we are telling the database to report events to us. The next section will elaborate on this.

Imagine you have a scenario where we have two separate backend services, different codebases, that are connecting to our same MongoDB. Let's call them Service A and Service B.

Let's imagine Service A being some sort of an IoT data aggregation backend constantly updating the database with data from IoT devices.

Service B is another backend, maybe built later, that wants to subscribe to the incoming IoT data and report the data to its front-end clients.

To achieve this goal, we could have made Service B poll the database every specific interval to check wether there's new data to fetch or not. The polling solution will put the load on Service B to keep asking for its data and check wether something new is inserted. In some cases, this is not desired.

"Change Streams" allows Service B to say "well, I want the DB to tell me whenever new data arrives in a specific collection" and until that happens, Service B shouldn't worry about doing anything.

The code in Service B will look something like this,

   const conn = new Mongo("your_mongo_URL);
   const db = conn.getDB('ourDB');
   const collection = db.collection('data');

   const changeStream = collection.watch();

   changeStream.on('change', (data) => {
      // do something when there is a change in the data 
      // collection

   });

Enter fullscreen mode Exit fullscreen mode

What the code above does is that: it establishes a connection to the mongodb instance in required to subscribe to. The codes then specifies a specific collection data that it will watch on.

One way to specify the kind of event we are interested in will be done using the next code snippet

   changeStream = collection.any.watch([
      {
        $match: {
          $or: [
            { operationType: "insert" },
            { operationType: "update" },
            { operationType: "delete" },
          ],
        },
      },
    ])

Enter fullscreen mode Exit fullscreen mode

The next code snippet illustrates how we can check against the kind of change that's fired and decide the kind of operation we want to do.

  changeStream.on("change", async (data) => {
      switch (data.operationType) {
        case "insert":
          console.log("Document inserted", data.fullDocument)
          break
        case "update":
          console.log("Document updated", data.fullDocument)
          break
        case "delete":
          console.log("Document deleted", data.fullDocument)
          break
        default:
          break
      }
    })

Enter fullscreen mode Exit fullscreen mode

In that fashion, Service B can easily detect any event happening in the database.

I hope this article shed light on some feature you might need in your next project.

Discussion (0)