DEV Community

Cover image for RxJS 7 - Pipeable Operators
Barış BALLI
Barış BALLI

Posted on • Edited on

RxJS 7 - Pipeable Operators

Here are my detailed notes from the fantastic course created by Jurek Wozniak, called RxJS 7 and Observables: Introduction.

In order to understand the concept please consider buying the course yourself, these notes are simply subsidiaries

Pipeable Operators

Operator Stacking

Pipe operators make some operations before an event actually reaches to an observer.

[Event Source → operator 1 → operator 2 → operator 3 → …] → observer

Important thing here is observer only have access the last value after all those operators.

This works just like javascript array utility functions.

Note: Applying a pipeable operator creates a new observable with some additional logic, it doesn’t modify the existing observable.

filter

When a value passed to the filter function it will either pass to (other operators or observable) or it will be discarded.

Even though filter function filters notifications emitted by the next, it always passes error and complete

We can call pipe method from any observable and then we can call any operator we want.

import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';

interface NewsItem {
  category: 'Bussiness' | 'Sports';
  content: string;
}

const newsFeed$ = new Observable<NewsItem>((subscriber) => {
  setTimeout(() => {
    subscriber.next({ category: 'Bussiness', content: 'A' });
  }, 1000);
  setTimeout(() => {
    subscriber.next({ category: 'Sports', content: 'B' });
  }, 3000);
  setTimeout(() => {
    subscriber.next({ category: 'Bussiness', content: 'C' });
  }, 4000);
  setTimeout(() => {
    subscriber.next({ category: 'Sports', content: 'D' });
  }, 5000);
  setTimeout(() => {
    subscriber.next({ category: 'Bussiness', content: 'E' });
  }, 6000);
});

newsFeed$
  .pipe(filter((item) => item.category === 'Sports'))
  .subscribe((item) => console.log(item));
Enter fullscreen mode Exit fullscreen mode

Image description

If we want we can extract only sport feed and use it in anywhere we want

import { Observable } from "rxjs";
import { filter } from "rxjs/operators";

interface NewsItem {
  category: 'Business' | 'Sports';
  content: string;
}

const newsFeed$ = new Observable<NewsItem>(subscriber => {
  setTimeout(() => 
    subscriber.next({ category: 'Business', content: 'A' }), 1000);
  setTimeout(() => 
    subscriber.next({ category: 'Sports', content: 'B' }), 3000);
  setTimeout(() => 
    subscriber.next({ category: 'Business', content: 'C' }), 4000);
  setTimeout(() => 
    subscriber.next({ category: 'Sports', content: 'D' }), 6000);
  setTimeout(() => 
    subscriber.next({ category: 'Business', content: 'E' }), 7000);
});

const sportsNewsFeed$ = newsFeed$.pipe(
  filter(item => item.category === 'Sports')
);

newsFeed$.subscribe(
  item => console.log(item)
);
Enter fullscreen mode Exit fullscreen mode

map

map function works really similar to js map function

import { forkJoin } from "rxjs";
// Mike is from New Delhi and likes to eat pasta.

import { ajax } from "rxjs/ajax";
import { map } from "rxjs/operators";

const randomFirstName$ = ajax<any>('https://random-data-api.com/api/name/random_name').pipe(
  map(ajaxResponse => ajaxResponse.response.first_name)
);

const randomCapital$ = ajax<any>('https://random-data-api.com/api/nation/random_nation').pipe(
  map(ajaxResponse => ajaxResponse.response.capital)
);

const randomDish$ = ajax<any>('https://random-data-api.com/api/food/random_food').pipe(
  map(ajaxResponse => ajaxResponse.response.dish)
);

forkJoin([randomFirstName$, randomCapital$, randomDish$]).subscribe(
  ([firstName, capital, dish]) =>
    console.log(`${firstName} is from ${capital} and likes to eat ${dish}.`)
);
Enter fullscreen mode Exit fullscreen mode

Image description

tap

tab operator works like a spy and allow us to make some side effect without interacting with the notifications.

Because of in map, the value we return becomes the input of the next chained function sometimes for debug purposes for example we can pass a tap() function in the middle and spy what is happening

import { of } from "rxjs";
import { filter, map, tap } from "rxjs/operators";

of(1, 7, 3, 6, 2).pipe(
  filter(value => value > 5),
  map(value => value * 2),
  tap({
    next: value => console.log('Spy:', value)
  }),
).subscribe(value => console.log('Output:', value));
Enter fullscreen mode Exit fullscreen mode

But of couse if we would use map function with curly brances and return values with return keyword, just before returning them we could also console log.

But keep in mid that tap operator is more than just a console log, it is there for making any side effects therefore it is quite a useful tool.

One more thing…

Starting from RxJS 7.3.0, the tap() operator can do even more. You can see when the Subscription starts and ends at the level of the tap() operator.

On top of the ‘next’, ‘error’ and ‘complete’ handlers which we have discussed above, the tap() operator introduces three more handlers:

  • subscribe — called when a new Subscription is made,
  • unsubscribe — called when the Subscription is closed by unsubscribing (calling unsubscribe); this will NOT be executed when ‘error’ or ‘complete’ is emitted,
  • finalize — called when the Subscription ends, no matter what the cause: it can be either unsubscribing, an error or a complete notification; all will cause the ‘finalize’ handler to be called.
...
  tap({
    subscribe: () => { console.log('New Subscription'); },
    unsubscribe: () => { console.log('Unsubscribed'); },
    finalize: () => { console.log('Subscription ended'); }
  }),
...
Enter fullscreen mode Exit fullscreen mode

debounceTime

debounceTime delays the emitted notifications and only emits the lastly emitted value after a certain threshold.

For example we have multiple notifications coming let’s say 5 notifications per second and we are only interested in with the last value. We can have such a scenario when we listen for input field, the user writes fastly and when the user stops for a bit we finally emit the value and work on it.

In this way we can avoid the excesive processing.

Image description

import { fromEvent } from 'rxjs';
import { map, debounceTime } from 'rxjs/operators';

const sliderInput = document.querySelector('input#slider');

fromEvent<any>(sliderInput, 'input')
  .pipe(
    debounceTime(500),
    map((event) => event.target['value'])
  )
  .subscribe((value) => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Image description

In this example instead of emitting thousands of notifications it just emits 3, which is an amazing performance gain.

catchError

catchError function provides a fallback observable in case of error. It automatically pass next() and complete() functions it doesn’t interest with them.

If an error notification occurs instead of passing it to the next chained function it passes it to an observable.

And this observable maybe directly emits next() and complete() notifications, then those notifications are passed to the next chained functions.

import { Observable, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const failingHttpRequest$ = new Observable((subscriber) => {
  setTimeout(() => {
    subscriber.error(new Error('Timeout'));
  }, 3000);

  return () => console.log('Teardown');
});

console.log('App started');

failingHttpRequest$
  .pipe(
    catchError((error) =>
      of(`Fallback value and err message: ${error.message}`)
    )
  )
  .subscribe({
    next: (value) => console.log(value),
    error: (err) => console.log(err.message),
    complete: () => console.log('Completed'),
  });
Enter fullscreen mode Exit fullscreen mode

Image description

We see that error didn’t pass to the other chained functions instead, we just create a new observable with *of* and passed the next() function also the error message.

We can also see that after of function sends a completed notification it also emits a completed notification to our main pipe.

Sometimes we may want to hide the error message or not want to do anything when error occurs, for those cases rxjs has an EMPTY observable, this observable as the name suggests is empty and just emits complete() notification.

import { EMPTY, Observable, of } from "rxjs";
import { catchError } from "rxjs/operators";

const failingHttpRequest$ = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.error(new Error('Timeout'));
  }, 3000);
});

console.log('App started');

failingHttpRequest$.pipe(
  catchError(error => EMPTY)
).subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed')
});
Enter fullscreen mode Exit fullscreen mode

Flattening Operators

flattening operators work like catch error but for next notification.

Flattening operator react to a next notification by subscribing a new observable, as long as source emit new values flattening operator will keep subscribing to new observables.

Flattening Operators Static Example

In the flattening operators first source emit a value and with that value a new observable is getting created, and it also starts to emit new values, then if we have a complete notification coming from the newly created observable, they are not emitted to the main notifications.

let’s make an example with concatMap

import { Observable, of } from 'rxjs';
import { concatMap } from 'rxjs/operators';

const source$ = new Observable((subscriber) => {
  setTimeout(() => subscriber.next('A'), 2000);
  setTimeout(() => subscriber.next('B'), 3000);
});

console.log('App has started');
source$
  .pipe(concatMap((value) => of(1, 2)))
  .subscribe((value) => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Image description

As we can see we totally mapped the values ‘A’ and ‘B’ to those observables.

Even though for the static numbers, it is not a really efficient use case, if we can handle creation of hot observables etc. These flattening operators can be quite useful.

Flattening Operators - Dynamic HTTP Request

Let’s this time make a ***************concatMap*************** example that will send an http request with the value provided

import { fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
import { concatMap, map } from "rxjs/operators";

const endpointInput: HTMLInputElement = document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');

fromEvent(fetchButton, 'click').pipe(
  map(() => endpointInput.value),
  concatMap(value =>
    ajax(`https://random-data-api.com/api/${value}/random_${value}`)
  )
).subscribe(
  value => console.log(value)
);
Enter fullscreen mode Exit fullscreen mode

Image description

Flattening Operators - Error Handling #1

Even though the complete notifications are not passed to the main stream, the error notifications does!

So we need to handle tem carefully in order to not kill our main pipeline

import { EMPTY, fromEvent } from "rxjs";
import { ajax } from "rxjs/ajax";
import { catchError, concatMap, map } from "rxjs/operators";

const endpointInput: HTMLInputElement = document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');

fromEvent(fetchButton, 'click').pipe(
  map(() => endpointInput.value),
  concatMap(value =>
    ajax(`https://random-data-api.com/api/${value}/random_${value}`)
  ),
  catchError(() => EMPTY)
).subscribe({
  next: value => console.log(value),
  error: err => console.log('Error:', err),
  complete: () => console.log('Completed')
});
Enter fullscreen mode Exit fullscreen mode

Image description

We have a problem with this approach, even though we catch the error, we catch it in the main pipeline and convert it to a complete notification this also kills the main pipeline and we cannot keep searching.

Flattening Operators - Error Handling #2

In order to solve the above problem we need to chain catchError to the internal observable

import { EMPTY, fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';

const endpointInput: HTMLInputElement =
  document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');

fromEvent(fetchButton, 'click')
  .pipe(
    map(() => endpointInput.value),
    concatMap((value) =>
      ajax(`https://random-data-api.com/api/${value}/random_${value}`).pipe(
        catchError((error) => of(`Could not fetch data: ${error}`))
      )
    )
  )
  .subscribe({
    next: (value) => console.log(value),
    error: (err) => console.log('Error:', err),
    complete: () => console.log('Completed'),
  });
Enter fullscreen mode Exit fullscreen mode

Image description

Flattening Operators - Concurrency - concatMap

concatMap operators do not pass to the next observable until the first observable completes.

This can be a good feature since uncompleted observables would create a memory leak in long term.

concatMap always guarantees that observables will be executed in the entered order. And this is super important (think multiple httpRequests or mouse positions for example)

Flattening Operators - switchMap

switchMap provides us almost the same solution as the concatMap but with one big difference.

If for example a new value is emitted while the previous observable still didn’t provide complete notification, switchMap automatically cancels the previous observable and creates the new observable with the new value.

It creates unimportant memory leaks because program cleans this memory after the cancel operation

This way it stops memory leaks and also can act really fast, based on the situation.

Flattening Operators - mergeMap

mergeMap is also like concatMap, but it allows multiple open observables, it doesn’t cancel the previous observable like switchMap, it allows them to work concurrently,

and this is sometimes, the exact thing we need.

Let's keep in touch

Hey, thanks a lot if you read it so far.
I want you to keep in touch for more sweet content.
Please subscibe in Dev.to and other channels (🦸‍♂️ especially twitter)

Twitter 🐦 https://twitter.com/barisbll_dev
Linkedin 📎 https://www.linkedin.com/in/barisbll-dev/
Github 👨🏻‍💻 https://github.com/barisbll
Medium 📰 https://medium.com/@barisbll

Top comments (0)