Let's say that you have to extract 100M objects from a database, make some transformations on them and then load them into another storage system.
Problems will arise as soon as writing into the second DB will become slower than reading from the first. Depending on the implementation, you could face one of these issues:
- extracted data stacks up in your memory, and your program crashes because of the memory usage;
- you send too many requests in parallel to your target database;
- your program is slow because you process each page of data in sequence.
At Forest Admin, we recently faced this issue to move data from a Postgresql database to ElasticSearch.
These problems can be addressed by processing data in streams that support backpressure. It allows the stream to process data at the pace of the slowest asynchronous processing in the chain.
RxJS is a great streaming library, but it does not natively support backpressure, and it's not easy to find examples. So, I decided to share one.
Let's illustrate with an example
Let's fake the extract method just for the purpose of this article:
async function extract(pageSize, page) {
// Just fake an async network access that
// resolves after 200ms
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
if (pageSize * (page - 1) >= 100_000_000) {
return []
}
return new Array(pageSize)
.fill()
.map((_, index) => ({
id: pageSize * (page - 1) + index + 1,
label: `Random label ${Math.random()}`,
title: `Random title ${Math.random()}`,
value: Math.random(),
createdAt: new Date()
}));
}
The load method, could be asynchronous but that's not useful in this example.
function transform(i) { return i; }
And now, let's fake the load method:
async function load(items){
// Let's fake an async network access that takes
// max 150ms to write all the items
await new Promise((resolve) =>
setTimeout(resolve, Math.random() * 150)
);
}
Example of backpressure in RxJS
The backpressure is ensured by the BehaviorSubject
named drain
in the example below. You'll see that the code allow to push data concurrently on the target database, with a limit of 5 requests in parallel.
Input data is also loaded with concurrency, but this time the pace is regulated by the drain
subject. Every time a page is sent to the target database, we allow another one to be extracted.
const { BehaviorSubject } = require('rxjs');
const { mergeMap, map, tap, filter } = require('rxjs/operators')
async function extractTransformLoad() {
const CONCURRENCY = 5;
const PAGE_SIZE = 1000;
// This allows us to load a fixed number
// of pages from the beginning
const drain = new BehaviorSubject(
new Array(CONCURRENCY * 2).fill()
);
return drain
// This is necessary because the observable
// streams arrays. This allows us to push
// a fixed number of pages to load from
// the beginning
.pipe(mergeMap(v => v))
// Values inside the arrays don't really matter
// we only use values indices to generate page
// numbers
.pipe(map((_, index) => index + 1))
// EXTRACT
.pipe(mergeMap((page) => extract(PAGE_SIZE, page)))
// Terminate if it was an empty page = the last page
.pipe(tap((results) => {
if (!results.length) drain.complete();
}))
.pipe(filter(results => results.length))
// TRANSFORM and LOAD
.pipe(transform)
.pipe(mergeMap(load, CONCURRENCY))
// Just make sure to not keep results in memory
.pipe(map(() => undefined))
// When a page has been processed, allow to extract
// a new one
.pipe(tap(() => {
drain.next([undefined])
}))
.toPromise()
}
In the example above, we initialized the concurrency to 5, meaning that 5 requests can be sent to the target database at the same time. In order to reduce the time waiting for new data, the BehaviorSubject
named drain
ensures to load twice as much pages of data.
In this example,
- memory will contain 10 pages of data at the maximum;
- the processing will be as fast as possible with the maximum concurrency that we defined;
- only 5 queries can be made in parallel to the target database.
Top comments (0)