DEV Community

Cover image for Robust MongoDB Change Stream implementation
Sibelius Seraphini for Woovi

Posted on • Updated on

Robust MongoDB Change Stream implementation

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog.

Change streams enable you to update other data sources like a search index in elastic search, or publish events to subscribers.

A naive implementation would listen to some collections, but won't handle the case your service that is listening crashes or has some downtime when doing a rollout deployment.

This article provides a more robust solution that will avoid making you lose any change stream event, even when your service crashes or get some downtime.

Robust Change Streams using a Resume Token

const stream = model.watch([], {
  fullDocument: 'updateLookup',
});

stream.on('change', (data) => {})
Enter fullscreen mode Exit fullscreen mode

A stream will emit a ChangeStreamData for each data change in your collection.

For each data change it will also provide a resumeToken, that will let you resume the processing from this point in time.

A resumeToken is like a cursor in cursor-based pagination.

The implementation to use a resume token is easy as that:

const changeStreamListen = async <T extends any>(
  model: Model<T>,
  fn: (data: ChangeStreamData<T>) => void,
) => {
  const name = model.collection.name;
  const key = `resumetoken:${name}`;
  const resumeToken = await RedisCache.get(key);

  const getResumeOptions = () => {
    if (resumeToken) {
      return {
        resumeAfter: resumeToken,
      };
    }

    return {};
  };

  const stream = model.watch([], {
    fullDocument: 'updateLookup',
    ...getResumeOptions(),
  });

  stream.on('change', changeStreamMiddleware(name, fn)).on('error', (err) => {
    // eslint-disable-next-line
    console.log('change error:', err);
    Sentry.setExtra('error', err);
    Sentry.captureException(err);
  });
};
Enter fullscreen mode Exit fullscreen mode

This implementation saves the resume token for a given collection in Redis, and tries to resume from it if available.

In Summary

Change Streams can simplify a lot your software architecture.
Providing a robust implementation of it is important to make sure you won't lose any data change.

References


Woovi
Woovi is a Startup that enables shoppers to pay as they like. Woovi provides instant payment solutions for merchants to accept orders to make this possible.

If you want to work with us, we are hiring!

Top comments (0)