DEV Community

Cover image for RxJS Pipelines
Evgeniy OZ
Evgeniy OZ

Posted on • Edited on • Originally published at Medium

RxJS Pipelines

To reveal the full power of observables, we can combine them using the pipe() method or creation operators.

There are multiple reasons why we might want to do this: computed variables, shared data resources, and side effects.

Computed Variables

Computed (or "derived") variables are computed from values of other variables.

Let's take an example from the ngx-collection source code:

public readonly isMutating$: Observable<boolean> = this.select(
  this.isCreating$,
  this.isUpdating$,
  this.isDeleting$,
  (isCreating, 
   isUpdating, 
   isDeleting
  ) => isCreating || isUpdating || isDeleting
);
Enter fullscreen mode Exit fullscreen mode

Here we are combining 3 other observables to get a new variable that we can also observe.

To stay hot, computed variables should be composed only of hot observables.

Later we can use this variable as a source value for another computed variable:

public readonly isProcessing$: Observable<boolean> = this.select(
  this.isMutating$, // 👈
  this.isReading$,
  this.refreshingItems$,
  (isMutating, isReading, refreshingItems) => isMutating || isReading || refreshingItems.length > 0
);
Enter fullscreen mode Exit fullscreen mode

Turns out, that isProcessing$ is computed not from 3 observables, but from 5, but we are reducing this complexity by reusing isMutating$ variable.

The best thing about observable computed variables - is they are always "up to date": if any of the source values will change, the computed variable will be instantly changed. 

That's what is called "reactivity" in programming (wiki link). Switching from imperative to reactive programming requires a noticeable mental shift, and doesn't happen over an hour - if you are used to program in imperative style (every book for beginners teaches us this way), it will take some time and effort to change your habits and your code.

But just imagine a reward: an application, where modification of some value will automatically update every part where this value is involved, and will not touch parts that don't depend on that value. Magic 🪄

Shared Data Resources

Some values can be computed, some values can only be fetched from external sources - user input, API endpoints, and files. 

When we need the user's name in multiple places, we give the user only one input field to input it, and we share this data with the other components where we need it.

When we need to fetch a list of users from the API endpoint, we would like to do it once, and not send a request in every component where we need this list.

Because of that, it's better when methods of your "services" return hot observables. Example:

private users$?: Observable<User[]>;

getUsers(): Observable<User[]> {
  if (!this.users$ ) {
    this.users$ = this.http.get('/users').pipe(
      // reset on 'error' or 'complete'
      finalize(() => this.users$ = undefined),
      // complete on cache invalidation
      takeUntil(this.cacheInvalidation$),
      // cache the last response for 10 minutes
      shareReplay({
        bufferSize: 1,
        refCount: false,
        windowTime: 600000
      })
    );
  }
  return this.users$;
}
Enter fullscreen mode Exit fullscreen mode

Caching might be more complicated, it's just a simple example: the last fetched response will be cached for 10 minutes, or until the cache invalidation event.

Now, because it's a hot observable, we can reuse it in computed variables, side effects, and templates.

Side Effects and Pipeline

One of the places where you will reuse a lot of observables is [side] effects.

Let's use an artificially complicated effect as an example, and explain, step by step, how pipes work in RxJS:

private readonly load = this.effect(_ => _.pipe(
  // Pipe #0
  concatLatestFrom(() => this.userId$),
  tap(() => this.patchState({loadingFollowers: true})),
  switchMap(([_, userId]) => this.api.getFollowers(userId).pipe(
    // Pipe #1
    tap((followers) => this.patchState({
      followers: followers, 
      loadingFollowersAvatars: true
    })),
    switchMap((followers) => {
      const requests = followers.map((follower) => this.getAvatar(follower.id).pipe(
        // Pipe #2
        tap((avatar) => this.setAvatarForUserId(follower.id, avatar)),
        timeout(5000),
        catchError(() => of(undefined)),
        defaultIfEmpty(undefined),
      ));

      return forkJoin(requests).pipe(
        // Pipe #3
        finalize(() => this.patchState({loadingFollowersAvatars: false})),
        map(() => followers.length)
      );
    }),
    tap((n) => console.log("Number of followers: ", n)),
    finalize(() => this.patchState({loadingFollowers: false}))
  ))
));
Enter fullscreen mode Exit fullscreen mode

After reading the code of this example you can see why it's really handy, when methods of your API service return observables, and not just responses or subscriptions.

What this code does:

  1. Every time this effect receives a new value (void)…
  2. We fetch the current user ID,
  3. For that ID we fetch the list of followers,
  4. Then we create a list of requests to load an avatar for every follower,
  5. and assign an avatar to the follower's ID;
  6. After avatars are assigned, we print to the console the number of followers;
  7. We modify the state of loading* fields to render the corresponding spinners.

getFollowers() and getAvatar() are GET requests, so in both cases, we can cancel the previous request when we got the new value, so we should use switchMap() for mapping values to observables.

Now please look at the numbers of the pipes in the code comments.

"Pipe #0", the main pipe, contains the sequence of operators, that will be applied to the observable, created by effect() method.

"Pipe #1" contains a chain of operators, applied to the observable, returned by getFollowers() method.

"Pipe #2" will be applied only to the observable, returned by getAvatar() method.

"Pipe #3" will be applied to the forkJoin() only.

It is an important fact, that the operators, provided to pipe(), will be applied only to the observable, which "created" that pipe.

Because of that, timeout(5000) will not be applied to the main observable (Pipe #0) or any other pipe except Pipe #2.

Because of the same fact, finalize() in Pipe #3 will only be called, when forkJoin() is completed, and finalize() from Pipe #1 will be called only when getFollowers() is completed or threw an error.

Also, please notice that the first operators in every pipe receive as a value exactly what was emitted by the observable of that pipe.

Pipe #1 was created for getFollowers() observable - the first operator in this pipe will receive followers as an argument.

Pipe #2 was created for getAvatar() observable - the first operator will receive avatar (and inside that part of the code we still have access to the follower variable, just because it's still in our scope).

Now the most difficult part to explain and understand (when you read it for the first time): every operator in the pipe returns a modified value, so the next operator will receive the result, returned by the previous operator. Value, returned by the last operator in sequence, will become the value of the observable for subscribers or sub-pipes.

Some operators, like tap(), just mirror the input observable (in other words, return the values without modifications). Other operators, like map(), return modified value.

Let's find why in the last tap() of that example we receive the numbers of followers:

  1. The previous operator is switchMap((followers)…),
  2. It returns an observable - forkJoin()
  3. and the last operator of that forkJoin() is map(() => followers.length),
  4. so the followers.length will be returned by switchMap((followers)…),
  5. and it will become the input value for the next operator: tap((n) => console.log…).

So when we use operators that modify values, we should care about what results we return to the next operator in the pipe, and what result is returned by the last operator.

Using this knowledge, you can create not only sophisticated, but also powerful pipelines in RxJS ;)


💙 If you enjoy my articles, consider following me on Twitter, and/or subscribing to receive my new articles by email.

🎩️ If you or your company is looking for an Angular consultant, you can purchase my consultations on Upwork.

Top comments (0)