DEV Community

Cover image for RxJS 7 - Creation Functions
Barış BALLI
Barış BALLI

Posted on

RxJS 7 - Creation Functions

Here are my detailed notes from the fantastic course created by Jurek Wozniak, called RxJS 7 and Observables: Introduction.

In order to understand the concept please consider buying the course yourself, these notes are simply subsidiaries

Creation Functions

Until now we always created the observable internal logic manually, but this is not at ideal approach since we usually have quite complicated observable logic.

Rxjs comes with many built-in creation functions. Creation functions create an observable with certain popular behaviour. We already saw an ajax function that makes an http request inside.

of - How Creation Functions Work

*of*, creation function takes an array of values, then emits all those values with next(), and when it comes to the last item, after emitting with next() it completes the observable with complete()

import { of } from 'rxjs';

of('Alice', 'Ben', 'Charlie').subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Completed'),
});
Enter fullscreen mode Exit fullscreen mode

We can also easily make our own custom of function

import { Observable, of } from "rxjs";

ourOwnOf('Alice', 'Ben', 'Charlie').subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed')
});

function ourOwnOf(...args: string[]): Observable<string> {
  return new Observable<string>(subscriber => {
    for(let i = 0; i < args.length; i++) {
      subscriber.next(args[i]);
    }
    subscriber.complete();
  });
}
Enter fullscreen mode Exit fullscreen mode

from

from works like *of*, but instead of having seperate values from receive one array, promise or an iterable.

When it takes a promise if it resolves from calls next with resolved value, if it receives rejects from calls error with rejected value.

And of course as always at the end emits a complete notification (also with promises.)

import { from } from "rxjs";

const somePromise = new Promise((resolve, reject) => {
  // resolve('Resolved!');
  reject('Rejected!');
});

const observableFromPromise$ = from(somePromise);

observableFromPromise$.subscribe({
  next: value => console.log(value),
  error: err => console.log('Error:', err),
  complete: () => console.log('Completed')
});
Enter fullscreen mode Exit fullscreen mode

We can also pass observables to from (even though it creates an observable with it 😀 )

fromEvent

fromEvent is a creation function that emits events from various event sources.

Some of those event sources

  • Dom EventTarget
  • Node.js Event Emitter
  • jQuery Events

for example for the *******************fromEvent(button, click()),*******************

subscribe() will work like addEventListener()

unsubscribe() will work like removeEventListener()

Because actually rxjs will use those methods for us

import { fromEvent } from 'rxjs';

const triggerButton = document.querySelector('button#trigger');

fromEvent<PointerEvent>(triggerButton, 'click').subscribe((event) =>
  console.log(event.type, event.x, event.y)
);
Enter fullscreen mode Exit fullscreen mode

Image description

If we want to build it ourselves we could do it like this (with additional setTimeout)

import { fromEvent, Observable } from "rxjs";

const triggerButton = document.querySelector('button#trigger');

// const subscription = fromEvent<MouseEvent>(triggerButton, 'click').subscribe(
//   event => console.log(event.type, event.x, event.y)
// );

const triggerClick$ = new Observable<MouseEvent>(subscriber => {
  const clickHandlerFn = event => {
    console.log('Event callback executed');
    subscriber.next(event);
  };

  triggerButton.addEventListener('click', clickHandlerFn);

  return () => {
    triggerButton.removeEventListener('click', clickHandlerFn);
  };
});

const subscription = triggerClick$.subscribe(
  event => console.log(event.type, event.x, event.y)
);

setTimeout(() => {
  console.log('Unsubscribe');
  subscription.unsubscribe();
}, 5000);
Enter fullscreen mode Exit fullscreen mode

We have to return a cleanup function to leave the used resource (in our case event listener), in order to avoid memory leaks

As we can see rxjs creation functions make event listening much easier.

timer

timer works as an setTimeout function inside an observable

import { timer } from 'rxjs';

timer(200).subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Completed'),
});
Enter fullscreen mode Exit fullscreen mode

Image description

As we can see it automatically emits the value 0 after finishes, and calls complete we can also see what happens inside

import { Observable, timer } from "rxjs";

console.log('App started');

const timer$ = new Observable<number>(subscriber => {
  const timeoutId = setTimeout(() => {
    console.log('Timeout!');
    subscriber.next(0);
    subscriber.complete();
  }, 2000);

  return () => clearTimeout(timeoutId);
});

const subscription = timer$.subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed')
});

setTimeout(() => {
  subscription.unsubscribe();
  console.log('Unsubscribe');
}, 1000);
Enter fullscreen mode Exit fullscreen mode

interval

interval works just like the timer, this time instead of receiving just the value 0, we have in interval values starting from 0 and incremented by one every once in a dedicated interval time.

forkJoin - Handle Multiple Http Calls

forkJoin works different than other functions we talked previously.

forkJoin takes multiple observables and wait until ***********************************all the observables emit their value.*********************************** After receiving all the values from those observables forkJoin emits an array of responses.

This can be handy if we want to make multiple http requests and make an operation just after receiving all the answers.

import { forkJoin } from 'rxjs';

import { ajax } from 'rxjs/ajax';

const randomName$ = ajax('https://random-data-api.com/api/name/random_name');

const randomNation$ = ajax(
  'https://random-data-api.com/api/nation/random_nation'
);

const randomFood$ = ajax('https://random-data-api.com/api/food/random_food');

// randomName$.subscribe((ajaxResponse) =>
//   console.log(ajaxResponse.response.first_name)
// );
// randomNation$.subscribe((ajaxResponse) =>
//   console.log(ajaxResponse.response.capital)
// );
// randomFood$.subscribe((ajaxResponse) =>
//   console.log(ajaxResponse.response.dish)
// );

forkJoin([randomName$, randomNation$, randomFood$]).subscribe(
  ([nameAjax, nationAjax, foodAjax]) => console.log(`${nameAjax.response.first_name} is from ${nationAjax.response.capital} and likes to eat ${foodAjax.response.dish}.`)
);
Enter fullscreen mode Exit fullscreen mode

If we would subscribe to each observable ourselves it would emit values in random order,

Image description

Thanks to forkJoin we can have all those values at once and format them to a specific format like this.

Image description

But let’s say we have such an observable that calls next multiple times before actually completes the action in that case what will be returned?

In that case the last values emitted via next before the complete notification are returned as response. And all the previous values emitted before those last ones are discarded.

And what happens when an error notificatios is emitted by one of the observables in the forkJoin function? Of course it automatically ends the subscription with error

import { forkJoin, Observable } from "rxjs";

const a$ = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next('A');
    subscriber.complete();
  }, 5000);

  return () => {
    console.log('A teardown');
  };
});

const b$ = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.error('Failure!');
  }, 3000);

  return () => {
    console.log('B teardown');
  };
});

forkJoin([a$, b$]).subscribe({
  next: value => console.log(value),
  error: err => console.log('Error:', err)
});
Enter fullscreen mode Exit fullscreen mode

Image description

We can see that teardown function is called as soon as error notification is received in both of the observables.

Combine Latest - Reacting to Multiple Input Changes

combineLatest also takes an array of observables but instead of emitting just the last values before complete notification, it emits a new array of resolved values each time one of the observables emit a value (starting with first set of all resolved values)

Image description

import { combineLatest, fromEvent } from "rxjs";

const temperatureInput = document.getElementById('temperature-input');
const conversionDropdown = document.getElementById('conversion-dropdown');
const resultText = document.getElementById('result-text');

const temperatureInputEvent$ = fromEvent(temperatureInput, 'input');
const conversionInputEvent$ = fromEvent(conversionDropdown, 'input');

combineLatest([temperatureInputEvent$, conversionInputEvent$]).subscribe(
  ([temperatureInputEvent, conversionInputEvent]) => {
    const temperature = Number(temperatureInputEvent.target['value']);
    const conversion = conversionInputEvent.target['value'];

    let result: number;
    if (conversion === 'f-to-c') {
      result = (temperature - 32) * 5/9;
    } else if (conversion === 'c-to-f') {
      result = temperature * 9/5 + 32;
    }

    resultText.innerText = String(result);
  }
);
Enter fullscreen mode Exit fullscreen mode

Image description

As soon as we entered another value from one of two input fields above we will have a new emitted value.

Let's keep in touch

Hey, thanks a lot if you read it so far.
I want you to keep in touch for more sweet content.
Please subscibe in Dev.to and other channels (🦸‍♂️ especially twitter)

Twitter 🐦 https://twitter.com/barisbll_dev
Linkedin 📎 https://www.linkedin.com/in/barisbll-dev/
Github 👨🏻‍💻 https://github.com/barisbll
Medium 📰 https://medium.com/@barisbll

Top comments (0)