Event Emitters and Async Generators
I recently came across a situation where I needed to stream realtime updates from server to client. After some research, I opted not to go with the defacto solution of web sockets, and instead went with the equally well-supported approach of Server Sent Events (SSE).
SSE is a one-directional communication channel with an impressively simple browser API:
// establish connection
const eventSource = new EventSource(url);
// listen and handle events
eventSource.addEventListener(eventName, eventHandler);
// close connection
eventSource.close();
If the connection is interrupted without explicitly being closed by the client, the browser will automatically attempt to reestablish the connection.
On the server side, I used the Fastify SSE Plugin which supports using an event emitter to handle the firing of events.
Here's a simplified version of a GET /rates
endpoint used to subscribe to receive exchange rates:
import fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";
import { EventEmitter, on } from "events";
const eventEmitter = new EventEmitter();
const server = fastify();
server.register(FastifySSEPlugin);
server.get("/rates", (_request, reply) => {
reply.sse(
(async function* () {
for await (const [payload] of on(eventEmitter, "ratesUpdated")) {
yield {
data: JSON.stringify(payload),
event: "update",
};
}
})()
);
});
The async generator β async function* ()
β is what allows us to listen to events fired by the event emitter.
It's a good idea to use an abort controller to clean up when the connection drops. Here's what the code now looks like:
import fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";
import { EventEmitter, on } from "events";
const eventEmitter = new EventEmitter();
const server = fastify();
server.register(FastifySSEPlugin);
server.get("/rates", (request, reply) => {
const abortController = new AbortController();
request.socket.on("close", () => {
abortController.abort();
});
reply.sse(
(async function* () {
for await (const [payload] of on(eventEmitter, "ratesUpdated", { signal: abortController.signal })) {
yield {
data: JSON.stringify(payload),
event: "update",
};
}
})()
);
});
We can extract the async generator into a reusable and testable unit:
import { EventEmitter, on } from "events";
import { EventMessage } from "fastify-sse-v2";
interface Params {
abortController: AbortController;
eventEmitter: EventEmitter;
eventName: string;
}
function makeEventListenerGenerator({
abortController,
eventEmitter,
eventName,
}: Params) {
return async function* (): AsyncGenerator<EventMessage> {
for await (const [data] of on(
eventEmitter,
eventName,
{ signal: abortController.signal }
)) {
yield {
data: JSON.stringify(data),
event: "update",
};
}
};
}
This function can then be used in the GET /rates
handler as follows:
reply.sse(
makeEventListenerGenerator({
abortController,
eventEmitter,
eventName: "ratesUpdated",
})()
);
Writing the Test
Before we can test our makeEventListenerGenerator
function, it's important to understand that it returns an async generator function. Calling this function returns an async iterator: an object that can generate a sequence of values asynchronously.
The on
function, which we imported from node's events
module, is roughly equivalent to the browser's addEventListener
method. We can subscribe to events that are fired by the event emitter using the on
function.
Firing events is done using the event emitter's emit
method.
Here's the whole flow of publishing and consuming events:
import { EventEmitter, on } from "events";
const eventEmitter = new EventEmitter();
const iterator = on(eventEmitter, "ping");
eventEmitter.emit("ping", { key: "value" });
await iterator.next(); // => { value: [{ key: "value" }], done: false }
Armed with this knowledge, we can now unit test the makeEventListenerGenerator
function:
import { EventEmitter } from "events";
import { describe, expect, test } from "vitest";
import { makeEventListenerGenerator } from "./makeEventListenerGenerator";
describe("makeEventListenerGenerator", () => {
test("iterates over emitted events", () => {
const abortController = new AbortController();
const eventEmitter = new EventEmitter();
const eventName = "ratesUpdated";
const eventPayload = [{ from: "USD", to: "EUR", rate: 0.94 }];
const eventIterator = makeEventListenerGenerator({
abortController,
eventEmitter,
eventName,
})();
(async () => {
expect(await eventIterator.next()).toHaveProperty("value", {
data: JSON.stringify(eventPayload),
event: "update",
});
})();
eventEmitter.emit(eventName, eventPayload);
});
});
With that, our unit test is complete and we can give ourselves a pat on the back. But before I close off, there is one final key point that I feel needs to be covered.
Typically, unit tests take the form: arrange -> act -> assert
. If we read the test we just wrote from top to bottom, it seems like we are doing arrange -> act -> assert -> act
. What gives?
The last part of our test that runs is not the eventEmitter.emit(...)
line, but rather our assertion: expect(...).toHaveProperty(...)
. This is because, as soon as the await
keyword is encountered, the evaluation of the expression to its right β eventIterator.next()
β will be pushed onto the Microtask Queue. The main thread continues executing to the end, and only then can the result of the evaluated expression be processed.
The 2 code snippets below should help clarify this:
console.log("top");
(() => {
console.log("middle");
})();
console.log("bottom");
// logs "top", "middle", "bottom"
console.log("top");
(async () => {
console.log(await "middle");
})();
console.log("bottom");
// logs "top", "bottom", "middle"
Great care needs to be taken, not to be caught unawares by this behaviour. The following test passes even though the assertions are clearly wrong:
import { EventEmitter } from "events";
import { describe, expect, test } from "vitest";
import { makeEventListenerGenerator } from "./makeEventListenerGenerator";
describe("makeEventListenerGenerator", () => {
test("iterates over emitted events", () => {
const abortController = new AbortController();
const eventEmitter = new EventEmitter();
const eventName = "ratesUpdated";
const eventPayload = [{ from: "USD", to: "EUR", rate: 0.94 }];
const eventIterator = makeEventListenerGenerator({
abortController,
eventEmitter,
eventName,
})();
eventEmitter.emit(eventName, eventPayload);
(async () => {
expect(await eventIterator.next()).toHaveProperty("value", "false positive");
expect(false).toBe(true);
})();
});
});
Top comments (0)