DEV Community

Cover image for How I Created an Event-Driven Backend with RxJS, Server-Sent Events, Express and NodeJS
Shayan
Shayan

Posted on • Edited on

How I Created an Event-Driven Backend with RxJS, Server-Sent Events, Express and NodeJS

About a month ago, a friend and I came up with an idea for a small website and decided to create an MVP in a couple of days to give it a shot. The idea was pretty simple; a meme-driven chat room and a live price chart for each cryptocurrency. I was assigned to create the backend for the project, and my friend would make the web client.

I set three requirements for myself before starting to work on the server. First and foremost, I wanted to get the MVP out as soon as possible. Secondly, I wanted to make the server as lightweight as possible to just leave it running on a cheap VM. Lastly, I wanted to design the architecture to allow for easy scaling if the shit coin investors decide to make another stupid decision and use our application.

The first step was to think of the entire server as a pipeline. Basically, the whole thing is a pipeline that consumes a set of events, processes them, and then streams them to many clients.

Alt Text

Any incoming data can be considered as some sort of event. For example, we have things like users joining or leaving a room, publishing or deleting comments, and ticker price updates. On the other side, anything that consumes these events can be considered as a client; our database, cache, and every connected browser.

After giving this a bit more thought, the first thing that came to my mind was a combination of RxJS and Server-Sent Events. I have never written a server-side javascript project as I mainly use Golang and Python, but I really wanted to experiment with RxJS on the server-side, and this seemed to be the perfect time to give it a shot.

Alt Text

To explain the implementation in more detail, I will walk through implementing a very simple stock/cryptocurrency live price streaming endpoint. First, we need a RxJS Subject which is a multicast observable. Our subject will take care of streaming the ticker price events to all of our clients (subscribers).

import { Subject } from 'rxjs';
import { filter, map } from 'rxjs/operators';
// our RxJS subject
const TickerSubject = new Subject();
Enter fullscreen mode Exit fullscreen mode

Next, we need an entry point for our events to be pushed to our stream. We can either emit directly to our subject or create a wrapper function as an abstraction layer and sanitize and check our data before emitting new events.

To do so, let's define a function called EmitTickerPrice. Every time we get a new ticker price data, we will call this method with the proper parameters, and it will emit a new event to our ticker subject.

/**
 * Emit a new ticker price
 * @param {string} symbol: ticker symbol
 * @param {string} price: ticker price
 * @param {string} currency: ticker currency
 */
const EmitTickerPrice = async (symbol, price, currency) => {
    const ticker = {
        symbol, price, currency,
        createdAt: Math.floor(new Date() / 1000),
    };
    TickerSubject.next(ticker);
    return ticker;
};
Enter fullscreen mode Exit fullscreen mode

For our project, I am using PostgreSQL to persist historical ticker price information. I am also using Redis as a cache store to reduce the database load when clients request the data to render the price chart. As I mentioned previously, each of these is considered a client and independently subscribed to our RxJS subjects. We can call subscribe on our ticker subject and pass a callback method to observe and handle each incoming event.

TickerSubject.subscribe(ticker => {})
Enter fullscreen mode Exit fullscreen mode

Pretty easy, right? Well, not really. See, for our website, we are pushing a new ticker price event every five seconds for each supported stock and cryptocurrency ticker. These events are also not synchronized and come in at different intervals, which means that we get dozens of ticker events every second. The problem is that we don't want to call our Redis and PostgreSQL subscriber callback every time a new event is emitted. Instead, we want to implement some additional logic in our pipeline to reduce the load on these services.

PostgreSQL Observer

Let's start with PostgreSQL; inserting a new row individually each time a new ticker price is emitted is not ideal. This may differ for the different projects as, in some cases, we may need atomic inserts. However, for this project, the 30 seconds insertion delay was negligible. Luckily, RxJS makes it very easy to implement this feature by providing pipelines and dozens of operators. For our case, we can create a pipe and use the bufferTime operator to buffer our events for 30,000 milliseconds. Then, we can subscribe to the newly defined pipeline.

Let's start with PostgreSQL; inserting a new row individually each time a new ticker price is emitted is not ideal. This may differ for the different projects as, in some cases, we may need atomic inserts. However, for this project, the 30 seconds insertion delay was negligible. Luckily, RxJS makes it very easy to implement this feature by providing pipelines and dozens of operators. For our case, we can create a pipe and use the bufferTime operator to buffer our events for 30,000 milliseconds. Then, we can subscribe to the newly defined pipeline.

import { bufferTime } from 'rxjs/operators';
TickerSubject.TickerSubject.pipe(
    bufferTime(30000),
).subscribe(tickers => {})
Enter fullscreen mode Exit fullscreen mode

Our subscriber is called every 30 seconds, and it gets a list of buffered events in the past buffer period instead.

Redis Observer

Our problem gets a bit more interesting with Redis. As I mentioned previously, Redis is mainly used to cache the price points needed to generate the price chart displayed on the website.

Alt Text

This chart is created for the different intervals such as the past 5 minutes, one hour, or a day. As you can tell by now, we don't need a data point every 5 seconds for our 24-hour chart; instead, a data point every 30 minutes or even an hour would do the job.

Our Redis observer should throttle each unique ticker symbol for 30 minutes before calling the subscriber. To achieve this, we need to create a bit more complicated pipeline than what we previously had for the PostgreSQL observer.

Alt Text

First, we have to group our events based on their ticker symbol. To do so, we can use the groupBy operator provided by RxJS and provide an arrow function to specify how we are grouping these events. We want the group our events based on their ticker symbols; hence, we return the ticker symbol value from our arrow function.

Next, we will throttle each group to emit once every 30 minutes and finally merge all the groups into a single pipeline. We can use the mergeMap operator and map through each group to add the throttleTime operator with a 30-minute interval. Finally, we can subscribe to the pipeline and insert the data into our Redis server.

import { groupBy, mergeMap, throttleTime } from 'rxjs/operators';
TickerSubject.pipe(
    groupBy((ticker) => ticker.symbol),
    mergeMap((group) => group.pipe(
        throttleTime(30 * 60 * 1000),
    )),
).subscribe(ticker => {})
Enter fullscreen mode Exit fullscreen mode

We can even go further and buffer these events to take advantage of Redis pipelines, but I will skip that part as it will look almost identical to what we did with our PostgreSQL pipeline.

If you made it thus far, pat yourself on the back, take a deep breath and go get some coffee before we get our hands dirty with server-sent events.

Server-Sent Events Endpoint

For our website, I am using ExpressJS and the @awaitjs/express library to use async/await in my routers. Register the path /ticker/:symbol/event via GET method on our express server to create our server-sent events route.

Router.getAsync('/ticker/:symbol/event', async (req, res) => {})
Enter fullscreen mode Exit fullscreen mode

To enable SSE, we need to flush a couple of headers back to our client. We want the Connection set to keep-alive, Cache-Control set to no-cache and Content-Type set to text/event-stream so our client would understand that this is an SSE route.

In addition, I have added Access-Control-Allow-Origin to for CORS and X-Accel-Buffering set to no to avoid Nginx from messing with this route. Finally, we can flush the headers back to our client to kickstart the event stream.

Router.getAsync('/ticker/:symbol/event',
    async (req, res) => {
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Connection', 'keep-alive');
        res.setHeader('Access-Control-Allow-Origin', '*');
        res.setHeader('X-Accel-Buffering', 'no');
        res.flushHeaders();
});
Enter fullscreen mode Exit fullscreen mode

We can now start streaming data by writing something into our response. SSE provides a text-based protocol that we can use to help our clients differentiate between the event types. Each one of our events should look like the following:

event: ${event name}\n
data: ${event data}\n\n
Enter fullscreen mode Exit fullscreen mode

To make our lives a bit easier, I have created a helper function to take care of serialization for us.

/**javascript
 * SSE message serializer
 * @param {string} event: Event name
 * @param {Object} data: Event data
 * @returns {string}
 */
const EventSerializer = (event, data) => {
    const jsonString = JSON.stringify(data);
    return `event: ${event}\ndata: ${jsonString}\n\n`;
};
Enter fullscreen mode Exit fullscreen mode

On our website, we have half a dozen subjects similar to what we have created so far. To be able to differentiate between these events, we have to assign an event name to each. Let's use price_update for the ticker subject. In addition, we need to filter these events based on the dynamic path that our client has subscribed. For example, on /ticker/DOGE/event, we only want events related to Dogecoin. To implement these two features, let's create a new wrapper around our ticker subject to filter the pipeline and add our event name to the events.

import { filter, map } from 'rxjs/operators';
/**
 * Event stream for ticker price update
 * @param {string} symbol: ticker symbol
 * @returns {Observable<{data: *, name: string}>}
 */
function EventTickerStream(symbol) {
   return TickerSubject
      .pipe(
         filter((ticker) => ticker.symbol === symbol),
         map((ticker) => {
            return { data: ticker, name: 'price_update' };
         }
      ),
   );
}
Enter fullscreen mode Exit fullscreen mode

All left to do is merge these events into a single pipeline and create a new subscriber to write them into the SSE connection. We can take use the of operator to create a pipeline from all of our subjects. Then, we use the mergeAll operator to collect and merge all of our observables into a single observable. Then, we can subscribe to the observable, serialize our data and write it to our response. Finally, we have to make sure to unsubscribe from our observer when the SSE connection is closed. Putting all of these together, we should have something like the following

import { of } from 'rxjs';
import { mergeAll } from 'rxjs/operators';
Router.getAsync('/ticker/:symbol/event',
    async (req, res) => {
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Connection', 'keep-alive');
        res.setHeader('Access-Control-Allow-Origin', '*');
        res.setHeader('X-Accel-Buffering', 'no');
        res.flushHeaders();
        const symbol = req.params.symbol.toUpperCase();
        const stream$ = of(
            EventTickerStream(symbol),
            // other events ...
        ).pipe(
            mergeAll(),
        ).subscribe((event) => {
            res.write(EventSerializer(event.name, event.data));
        });
        req.on('close', () => {
            stream$.unsubscribe();
        });
});
Enter fullscreen mode Exit fullscreen mode

Aaannddd… that's it! We are done with our backend server. 
Here's an overall view of what we have created so far.

Alt Text

Server-Sent Events Client

To subscribe to our SSE route, we can create a new instance of the EventSource interface and pass our endpoint to the constructor. Once we have an instance, we can add event handlers for specific event names to process the incoming data. In our case, we can subscribe to the price_update event for Dogecoin and use the data to update our UI.

const eventSource = new EventSource("/ticker/DOGE/event");
eventSource.addEventListener(
   "price_update", (event) => {
       const data = JSON.parse(event.data);
       // use the data to update the UI
    }, false
);
// close the connection when needed
eventSource.close();
Enter fullscreen mode Exit fullscreen mode

At the end of the day, I am pleased about this architecture as it satisfies most of my requirements for this project. Going with a reactive design allowed me to implement many complex features more efficiently and less error-prone than an imperative model. Higher-level functions provided by RxJS, such as throttleTime and bufferTime solved many of my problems very quickly and saved me a lot of development time. Completing the first iteration of the MVP took us about 4 days.

I also wanted to deploy our services on the smallest virtual machine to reduce costs and benchmark server-side performance. Thus, I went with the $5/month digital ocean droplet. Over the last week, our server has served over 3.7M requests and over 120M events, and at one point, we had over 500 concurrent clients, which I think is a pretty damn good benchmark.

In terms of scalability, we still have a lot of room to grow vertically and increase the resources available on the VM. But if we want to grow horizontally, the current architecture allows us to deploy proxies that subscribe to our pipelines, either through our SSE endpoint or to the subjects over the network, and then multiplex the events to more clients.

That concludes the discussion on how I implemented an event-driven server for our project. You can check out the final result at Monke Cafe.

Thank you for reading; if you would like to chat, you can find me on Twitter @imsh4yy or via responses here.

Update: I have recently started working on a new project and have been using the same architecture design for pushing down information to my users. I would love to hear your feedback on the project: checkridehq.com, LogSnag - Track your projects' events

Top comments (1)

Collapse
 
mtyiska profile image
mtyiska

Excellent article. Could you share the repo for this?