DEV Community

Cover image for How to use mongoDB change streams as a powerful event-driven engine
Caio Campos Borges Rosa for Woovi

Posted on

How to use mongoDB change streams as a powerful event-driven engine

Before change streams became a feature of MongoDB, developers who wanted to track real-time changes in the database use to monitor oplog entries and track the changes in a specific collection based on timestamps. This process was often complex, and the mechanisms required for resuming and recovering reading were not particularly secure.

Change streams allow applications a direct interface in real time to deal with change in database collections, with powerful features to custom event-driven architectures.

Change streams are available in MongoDB when a replica set environment is configured. They rely on changes to the members as they reach consensus on a particular change in a majority of the members. This ensures the safety of data in a specific collection, especially in scenarios where failures may occur.

To utilize change streams in a local development environment, we configure a replica set using Docker Compose. For detailed instructions on this setup, you can refer to the guide written by @sibelius. We have invested considerable effort to ensure that all our developers can easily set up a local environment and work with the full range of features that MongoDB offers.

Setup

At Woovi we use change streams in a specific service called Publisher. This service is app that instantiates a subscriber for each collection in our Database from which we want to derive events.

import { companySubscriber } from './CompanySubscriber';
import { customerSubscriber } from './CustomerSubscriber';
import { userSubscriber } from './UserSubscriber';

export const setupSubscribers = () => {
  companySubscriber();
  userSubscriber();
  customerSubscriber();
};
Enter fullscreen mode Exit fullscreen mode

Events

The role of the subscriber is to monitor a collection and generate a data event. This event is then passed to a handler, which processes the data according to our specific needs.

export const userSubscriber = async () => {
  const stream = User.watch([], { fullDocument: 'updateLookup' });

  stream.on('change', (data) => handleUserSubscriberEvent(data));
};
Enter fullscreen mode Exit fullscreen mode

The stream output changes based on the event that occur on a particular collection. Mongodb has many change events as seen in the documentation.

The stream pipeline accepts multiple options, fullDocument: 'updateLookup' makes the event Update send the full document an not just updated fields as is default, each event has a particular stream payload you can use base on your application and may configurations to tailor as you need.

The first param is an array of pipeline options you can pass to modify the stream output.

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];
 const stream = User.watch(pipeline, { fullDocument: 'updateLookup' });
stream.on('change', (data) => handleUserSubscriberEvent(data));
});
Enter fullscreen mode Exit fullscreen mode

Use cases

Finally, in our handleUserSubscriberEvent function, we utilize the data object to drive any event-driven service within our application environment. In our specific case, we use it to create and update indices in our Elastic Search Service, which serves as the core technology behind our internal search tools.


export const handleUserSubscriberEvent = (data) => {
  const dataPicked = {
    _id: data.fullDocument._id,
    name: data.fullDocument.name,
    email: data.fullDocument.email,
    cellphone: data.fullDocument.cellphone,
    taxID: data.fullDocument.taxID,
  };

  const obj = {
    data: dataPicked,
  };

  handleDocumentIndexing(ELASTICSEARCH_INDEXES.USER, obj);
Enter fullscreen mode Exit fullscreen mode

You can use change streams as a powerful engine to event-driven applications such as:

  • Analytics Processing
  • Notifications
  • IoT integration with MQTT

The performance of mongoDB change streams allows Woovi to scale even more event-driven products as fast and secure as we can.

Woovi is a Startup that enables shoppers to pay as they like. To make this possible, Woovi provides instant payment solutions for merchants to accept orders.

If you want to work with us, we are hiring!

Top comments (0)