Life is too short. When searching for something, we can’t afford to type a whole word or sentence in a search field, or filling all the fields then hitting our old keyboard’s half-broken enter key to finally be able to see the first results... or nothing at all because our search criteria were too restrictive.
Don’t look at me like that! We can probably agree that most of us, if not all, are used to features like typeahead and live search results. We get frustrated every time we have to submit a search form.
TL;DR:
- if you are using NestJS, you will need this interceptor,
- if you are not using NestJS then maybe you should,
- we have to think reactively, I agree that it can have a steep learning curve but think about the pleasure of sliding on the other side of the hill ⛷,
- we can and should use RxJS everywhere,
- we should use observables even for single value streams,
- we should not ignore observables teardown logic.
🚨 Reactive Programming & RxJS to the rescue
Implementing these kinds of features can be tricky, especially if developed from scratch and with an imperative approach. That’s when reactive programming and RxJS come to the rescue. In fact, RxJS provides the right tooling and operators to implement these features in a few lines.
RxJS is such a perfect fit for these scenarios that most courses and tutorials cover the live search topic. It helps understand both how reactive programming works and how it can easily solve some challenging issues.
That’s when we end up with this common recipe:
keywords$ = this.keywordsControl.valueChanges;
data$ = keywords$.pipe(
/* Wait for the user to stop typing for 100ms and emit last value. */
debounceTime(100),
/* Ignore identical successive values
* (e.g. user pastes the same value in the input). */
distinctUntilChanged(),
/* when new keywords are emitted, this unsubscribes from the previous
* search result (canceling the underlying http request)
* and subscribes to the new one. */
switchMap(keywords => this.search(keywords))
)
The illustration below might help you notice the difference between RxJS flattening strategies and the related operators:
but if it doesn't help, you should definitely check out the great work by my buddy Shai Reznik: https://medium.com/@shairez/a-super-ninja-trick-to-learn-rxjss-switchmap-mergemap-concatmap-and-exhaustmap-forever-88e178a75f1b
🐢 Hey Debounce! Stop bullying my low latency!
The problem is that you are probably investing a lot of energy and money in producing low latency architectures and APIs but all these efforts just vanish when we introduce the artificial latency created by the debounceTime
operator.
What if we just get rid of the debounce? We are using switchMap
after all, and unnecessary requests are immediately canceled.
Wait a second! What happens on the back-end? Is the back-end "work" interrupted by some voodoo magic? Or did we just trigger some crazy chaos where the back-end is working for nothing until it realizes that the consumer is not there anymore?
🐈 Here comes the big cat
In a few words, NestJS is THE feature-rich NodeJS framework.
Amongst its wealth of features, there is native support of observables. This is quite handy even if we respond with a single value and not a stream of values. In fact, the interesting observables property we are looking for here is cancelability.
🧨 Observables Teardown Logic
Observables are said cancelable because we can unsubscribe whenever we need to, and interrupt the work. Cancelation works thanks to the teardown logic function returned when creating an observable.
Here’s an example of wrapping setInterval
in an observable:
function interval(period) {
return new Observable(observer => {
let i = 0;
const handle = setInterval(() => observer.next(i++), period);
/* This is the teardown logic. */
return () => clearInterval(handle);
});
}
As you can see, the observer function given to the Observable
's constructor returns the teardown logic function that calls clearInterval
in order to cancel the tasks scheduled by setInterval
.
⚠️ This is exactly how you should NOT implement an interval.
This implementation is scheduler naive.
You should use interval
or timer
instead.
🧪 The experiment
For the experiment, I needed to run some slow CPU, filesystem and memory intensive work on the back-end for every request. The first idea that crossed my mind was reading a big text file line by line and matching every one of them against the given keywords.
It turned out that even with a 1GB file, it was still quite fast.
That’s when I thought that reading multiple small files should be more inefficient. I just needed to generate a directory with lots of files... but wait! What about using node_modules
directory 🤔
Bingo! It could not be worse and that is exactly what I needed.
The implementation looks something like this and as you can see, the teardown logic immediately stops crawling the directory and reading files when the observer unsubscribes.
function getFiles(directoryPath) {
return new Observable(observer => {
...
return () => walker.pause();
}
}
function readLines(filePath) {
return new Observable(observer => {
...
return () => reader.close();
}
}
function search(): Observable<Line[]> {
return getFiles(nodeModulesPath)
.pipe(
mergeMap(file => readLines(file)),
...
);
}
😔 The disappointment
In the animation below, we can observe high CPU usage and an exponential memory usage on the back-end and that canceling the requests, even the last one, doesn’t interrupt the work.
By diving a little bit in Nest’s source code, we can see that our observable is converted to a promise using toPromise
method. In fact, Nest has to adapt to frameworks like ExpressJS that don’t handle observables.
public async transformToResult(resultOrDeferred: any) {
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
return resultOrDeferred.toPromise();
}
return resultOrDeferred;
}
🔍 Detecting request cancelation
In Nest, request objects are instances of NodeJS’ IncomingMessage
that trigger a close
event when the connection is closed or when the HTTP2 stream is closed.
If we can detect when the request is canceled, then we can interrupt the work in our RxJS response stream.
-
aborted
vsclose
:IncomingMessage
also triggers anaborted
event that you can ignore as it will probably be deprecated in the future. Cf. https://github.com/nodejs/node/issues/15456 & https://github.com/nodejs/node/issues/15525.
Nest has an interesting concept called interceptors:
- Interceptors have a set of useful capabilities which are inspired by the Aspect Oriented Programming (AOP) technique.
and it looks like this:
@Injectable()
export class NoopInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
return next.handle();
}
}
This makes it possible to write, in a single place, a function that:
- intercepts every incoming HTTP request,
-
listens to the request’s
close
event, - does something to interrupt the work.
One of the interesting properties of Nest interceptors, compared to Express middlewares for example, is that the next
parameter is not just a function that triggers the route function or the next middleware but it is an object with a handle
method that returns an Observable
.
Thanks to this feature, we can manipulate the response and the whole stream by adding operators to the given Observable
.
For instance, we can detect the request cancelation by listening to the close
event using RxJS's fromEvent
and interrupt the Observable
returned by the route handler using the takeUntil
operator.
The final interceptor should look like this:
@Injectable()
export class UnsubscribeOnCloseInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
if (context.getType() !== 'http') {
return next.handle();
}
const request = context.switchToHttp().getRequest() as Request;
const close$ = fromEvent(request, 'close');
return next.handle().pipe(takeUntil(close$));
}
}
Let's try it out!
As you can observe, thanks to the interceptor, canceling an HTTP request will automatically and almost immediately cancel the work by unsubscribing from the observable returned by the route handler.
This reduces CPU, memory and all resources usage and interrupts all the work even when the user simply closes the window.
🧠 Think reactive
The key takeaway here is that by adopting a reactive approach and using observables everywhere, we can easily benefit from observables cancelability and boost APIs performance with a generic interceptor.
MongoDB query cancelation
What if our data source was a database like MongoDB? Can we interrupt the query?
📻 Stay tuned for an upcoming post on this topic 😉
👨🏻🍳 Let me help you!
At Marmicode, we use our passion and experience in Web Development & eXtreme Programming to help you cook better apps, ship them fast and make you proud of your work.
We have the services you need:
- Code Review,
- Remote Consultations,
- Workshops,
- On-demand development billed by accepted points.
📨 kitchen at marmicode.io
🔗 Links
💻 Source code Nx monorepo with an Angular app, a NestJS API and custom CPU / Memory graphing app using Angular & GraphQL subscriptions.
🐦 @yjaaidi Stay tuned for more posts and upcoming workshops.
Top comments (0)