DEV Community

Vladimir Ivanenko
Vladimir Ivanenko

Posted on

Batching events with RxJS

What's that

Roughly speaking event batching is accumulating events for some time to process them all at once later.

We can employ two well-known strategies or even combine them to understand when it's time to flush and process the bunch of events buffered:

  • when the number of events reaches a threshold;
  • using an interval — e.g. flush events every 10 seconds.

Why use it

Event batching could be beneficial for:

  • performance reasons, for example, to reduce the number of writes into persistent storage or to optimize the amount of data transmitted over a network;
  • aggregation — e.g. to group by link and count page visits.

How to

Implementing event batching in JavaScript with RxJS is a no-brainer.

Let's start with a Node example.

const EventEmitter = require('events');
const { fromEvent, bufferCount } = require('rxjs');

// I assume you already have an instance of EventEmitter in your app. 
// In case I'm wrong, let's create the one.
const eventEmitter = new EventEmitter();

// listen to an event called `something-good-happened`
fromEvent(eventEmitter, 'something-good-happened')
   // accumulate events
  .pipe(
    // and flush them every time it's number reaches 3
    bufferCount(3),
    // let's log it
    tap(() => {
      console.log(
        `Great! The number of good things happened in a row reached ${events.length}. It's time to celebrate.`
      );
      console.log(events);
    })
  )
  // process the batch
  .subscribe((events) => {
    const goodThingsByUser = {};
    for (const event of events) {
        goodThingsByUser[event.userId] = (goodThingsByUser[event.userId] ?? 0) + 1;
    }
    // reportGoodThingsDone(goodThingsByUser);
  });
Enter fullscreen mode Exit fullscreen mode

And of course, an example for a browser.

import { fromEvent, bufferTime, filter } from "rxjs";

// listen to clicks on the whole document
const clicks$ = fromEvent(
  document.documentElement,
  "click",
  // selecte only properties we need
  (event) => ({
    type: event.type,
    time: new Date(),
    x: event.x,
    y: event.y
  })
);

clicks$
  .pipe(
    // flush events every 1 second
    bufferTime(1000),
    // move next only if there is at least one event
    filter((events) => events.length > 0)
  )
  // process the batch
  .subscribe((events) => {
    fetch("/my-analytics", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify(events)
    });
  });
Enter fullscreen mode Exit fullscreen mode

One more live example here.

Note that there is no silver bullet and each solution has its drawbacks.
Bulk processing of a large bunch of events could dramatically worse the performance of your app because of blocking the main thread, which you should avoid at all costs. In case you expect to process lots of data consider using a message queue. Look at BullMQ for example.

Thank you for reading!

Could you share some examples of applying event batching in your projects?

Top comments (0)