DEV Community

Cover image for Dangers and Treasures of RxJS
Evgeniy OZ
Evgeniy OZ

Posted on • Edited on • Originally published at Medium

Dangers and Treasures of RxJS

In this article, I'll show you how to avoid known dangerous pitfalls, and how to use some useful tricks, while working with RxJS observables.

Dangerous Operators 🏴‍☠️

Some operators can cause infinite loops or make your observable behave not as you expect. Let's start with the most dangerous.

combineLatestWith()

It's a very handy operator - usually, you want to pass there one or multiple observables and react to them.

 

// Mocks of the sources, just for illustration
const showNames$: Observable<boolean> = of(false);
const showEmails$: Observable<boolean> = of(false);
const users$ = of([{name: 'a', email: 'a@a'}]);

// Now let's create an observable 
// using `combineLatestWith()`

const usersList = users$.pipe(
  combineLatestWith(
    showNames$,
    showEmails$,
  ),
  // when `showNames$` OR `showEmails$` generate a new value,
  // we'll get latest values from all of them:
  map(([users, showNames, showEmails]) => {
    if (showEmails && showNames) {
      return users;
    }
    return users.map((user) => ({
      ...user,
      name: showNames ? user.name : '',
      email: showEmails ? user.email : ''
    }));
  })
);

Enter fullscreen mode Exit fullscreen mode

If at least one of the observables passed to combineLatestWith() emits, you'll get the last emitted values from every observable. But this convenience comes with 2 dangerous pitfalls:

⚠️ #1: It will not emit until every observable produced at least one value.

If you are subscribing to some hot observable, you have no guarantee that you will receive any values at the moment of subscribing.

Every observable passed to combineLatestWith(), should be double-checked - you should be sure that at least one value will be emitted.

If you are not sure about that, use startWith() and pass there some default value that your code will expect and can handle. Otherwise, it's too easy to create an "eternally hanging" observable when combineLatestWith() is involved.

️️⚠️ #2: If in your code you will trigger one of the observables (passed to combineLatestWith()) to emit a new value, you'll create an infinite loop.

Example:

observable$.pipe(
  combineWithLatest(toggle$),
  map(([value, currentToggleValue]) => {
    if (value && currentToggleValue) {
      toggle$.next(true); // infinite loop
    }
  })
);
Enter fullscreen mode Exit fullscreen mode

distinctUntilChanged()

A very handy operator, what could go wrong?

When you need to detect changes in strings, numbers, and other primitive types, everything is fine. But if you need to detect a change in an object, then this operator might work not as you expect - if the object remains the same, and only some key or sub-key has changed - this operator will not notify you about changes. 

In such cases, you can set a "comparator" argument, or use distinctUntilKeyChanged() operator.

forkJoin()

You mostly will use it to run multiple requests in parallel and get the responses when all of them are complete.

There are 3 dangerous things in this operator:

⚠️ #1: As said in the documentation: "If any given observable errors at some point, forkJoin will error as well and immediately unsubscribe from the other observables".

In most cases, it's not what you expect, but a workaround is quite easy: add catchError(() => of(someValue)) to every request:

let ob$ = forkJoin([
  req1.pipe(catchError(() => of(someValue))),
  req2.pipe(catchError(() => of(someValue))),
  //...
]);

// or, if you have requests in an array

let ob$ = forkJoin(
  requests.map(req => req.pipe(catchError(() => of(someValue))))
);
Enter fullscreen mode Exit fullscreen mode

Although, do not return EMPTY, because of the next point:

⚠️ #2: As said in the documentation: "…whenever any of the given observables completes without emitting any value, forkJoin will complete at that moment as well and it will not emit anything either, even if it already has some last values from other observables".

It happens not so often, but it still might happen (if some observable in the list returned EMPTY or actually didn't complete). And in your app it might create quite a nasty bug - it will look like your app doesn't respond and nothing happens.

To avoid it, you can use defaultIfEmpty():

const ob$ = forkJoin(
  requests.map(req => req.pipe(
    catchError(() => of(someValue)),
    defaultIfEmpty(someValue)
  ))
);
Enter fullscreen mode Exit fullscreen mode

⚠️ #3: As said in the documentation: "if there is an observable that never completes, forkJoin will never complete either".

It is pretty much possible for HTTP (XHR) requests to hang. In this case, an easy solution is timeout() operator.

For other (non-HTTP, hot) observables there is a chance that you are subscribing to watch an already emitted event. There is no common solution for such cases.

takeUntil()

The dangers of this operator are easy to avoid: it should be last in the sequence of the operators. With two exceptions: it should be before shareReplay({refCount: false}) and before aggregating operators (list).

It might sound a bit tedious, but you can just delegate control to the linter and forget about it.

More information you can find in this article.

Careless usage of switchMap()

If you read my previous article, you know the difference between switchMap(), concatMap(), mergeMap() and exhaustMap(). But, turns out, not everyone has read that article, and people carelessly use switchMap() in places where it might cause non-desired effects.

It doesn't mean that switchMap() is bad or has some flaws - it's a super-useful and good operator, people just misuse it.
Read my previous article and use the correct operators ;)

Unbound Methods

People are lazy, so we can write code like this:

ob$.pipe(
  catchError(this.errHandler)
);
Enter fullscreen mode Exit fullscreen mode

It will work until this.errHandler() does not touch this - because this will be something unexpected.

It's pretty easy to avoid:

ob$.pipe(
  catchError(() => this.errHandler())
);
Enter fullscreen mode Exit fullscreen mode

Treasures 💎

takeUntil()

This operator will let you avoid memory leaks in Angular Components  - just add it (last, as noted) to the list of operators before you subscribe() and the issue is solved!

It's the most simple and most convenient solution.

There is a super-handy linter for that.

You do not need this operator if you subscribe using async pipe.

finalize()

When composing the sequence of operators, if you think "after all that ends we should do this", then you might want to use finalize()

It is much more reliable than just adding some code to subscribe() or map(), tap(), catchError() - this operator will be executed even if an observable completes without emitting any values. And in some cases, it's pure gold.

ob$.pipe(
  tap(() => this.patchState({loading: true})),
  finalize(() => this.patchState({loading: false})),
  exhaustMap(() => this.api.createNewSomething())
);
Enter fullscreen mode Exit fullscreen mode

first()

Use this operator when you expect that an observable will emit only one value (like an HTTP request), but you can't guarantee it - usually, it happens in functions/methods/effects where you accept some observable as an argument. 

Without this operator, very curious side effects and bugs might appear, if an observable will start emitting multiple values when your code is not ready for that.

throttleTime()

If you ever tried to reproduce this logic: "accept first emitted value, start some side effect, then ignore any consecutive values for N milliseconds, then if there was some new value emitted during this time - emit it too", then throttleTime() would save your day. 

It works with similarities to debounceTime(), although its config argument has parameters leading and trailing, and it allows to create more sophisticated behaviors.

Example for the logic, mentioned above:

input$.pipe(
  throttleTime(250, undefined, {leading: true, traling: true}),
  distinctUntilChanged(),
  switchMap((input) => this.api.loadItems(value))
);
Enter fullscreen mode Exit fullscreen mode

tapResponse()

And the last in this list will be a third-party operator (not from RxJS).
You can take it from @ngrx/component-store library.

This operator will help you to don't forget to handle error cases:

ob$.pipe(
  tapResponse(
    (result) => this.handle(result),
    // the next argument is required, so you will not forget it
    (err) => console?.error(err)
  )
);
Enter fullscreen mode Exit fullscreen mode

If you know any other dangers and treasures of RxJS - share them in the comments!


💙 If you enjoy my articles, consider following me on Twitter, and/or subscribing to receive my new articles by email.

🎩️ If you or your company is looking for an Angular consultant, you can purchase my consultations on Upwork.

Top comments (0)