DEV Community

David Dal Busco
David Dal Busco

Posted on • Originally published at daviddalbusco.Medium

Angular: The Single Rule To Get Better At RxJS


Even though there is no strict rule, combining imperative and declarative programming concepts with RxJS might be making the development harder and the code less clean.

Shifting mind from a (often) default way of programming (i.e. imperative) to a reactive (i.e. declarative) approach, takes time.

However, there is a single rule that can help you make the switch:

⚠️ Do not subscribe, period ⚠️


Confused about the difference between imperative and declarative? Have a look to the excellent article of Josh Morony that compares both.


Why?

The rule is not strict, it is a guideline, like the code of the pirates of the Caribbean 🏴‍☠️.

It does not mean that you should never ever subscribe to a stream but rather, that you should try to avoid it. In that way, according my experience, you are going to slowly transform the imperative way of programming into more declarative concepts.

Concretely, while developing features in components, trying to use mostly the Angular | async pipe that automatically unsubscribe when components are destroyed, can in addition to avoid memory leak, help improve the coding style.

To explore such modus operandi, let’s refactor an Angular application that mix imperative and declarative programming concepts.


Starting Point

The following demo uses the coinpaprika API to display a list of cryptocurrencies to its user.

The code source is available on GitHub. Each following chapters (steps 1, 2, 3 and 4) are separate branches.

It defers the HTTP queries to a coins.service and presents the results in a coins.component.


Service

The provider acts as a store. It queries the list of cryptos, filter the results and persists these in memory.

The function list() is both reactive, by telling what it wants (httpClient.get), and imperative by checking and filtering the results.

import { Injectable, OnDestroy } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
  providedIn: 'root'
})
export class CoinsService implements OnDestroy {
  constructor(private httpClient: HttpClient) {}

  private coins: Coin[] = [];

  private destroy$: Subject<void> = new Subject();

  list() {
    this.httpClient
      .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
      .pipe(takeUntil(this.destroy$))
      .subscribe((allCoins: Coin[]) => {
        if (allCoins.length > 10) {
          this.coins = allCoins.filter(
            (coin: Coin) =>
              !coin.is_new && coin.rank > 0 && coin.rank < 100
          );
        }
      });
  }

  getCoins(): Coin[] {
    return this.coins;
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}
Enter fullscreen mode Exit fullscreen mode

Component

The component initializes the service and exposes a getter binding to parse the results to the UI.

import { Component, OnInit } from '@angular/core';
import { CoinsService } from '../coins.service';

@Component({
  selector: 'app-coins',
  templateUrl: './coins.component.html',
  styleUrls: ['./coins.component.css']
})
export class CoinsComponent implements OnInit {
  constructor(private readonly coinsService: CoinsService) {}

  ngOnInit(): void {
    this.coinsService.list();
  }

  get coins() {
    return this.coinsService.getCoins();
  }
}
Enter fullscreen mode Exit fullscreen mode

Template

The HTML list the coins.

<article *ngFor="let coin of coins">
  <h1>{{ coin.name }}</h1>
  <p>Symbol: {{ coin.symbol }}</p>
  <p>Rank: {{ coin.rank }}</p>
  <hr />
</article>
Enter fullscreen mode Exit fullscreen mode

Step 1: (More) Declarative

Even though I said above that the rule is actually a guideline, I would suggest anyway to never subscribe in services, respectively to be more strict about memory leaks.


As we do not want to subscribe, we have first to transform the method that is called by the component to return an Observable .

list(): Observable<Coin[]> {
  return this.httpClient
    .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
    ...
}
Enter fullscreen mode Exit fullscreen mode

Without any other changes, the compiler will warn you about the return values that are not matching (as we are still subscribing to the stream and therefore are actually returning a Subscription). That’s why, we replace the subscribe with an RxJS operator. In the particular case we are using tap because we still want to assign the result to the store.

list(): Observable<Coin[]> {
  return this.httpClient
    .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
    .pipe(
      tap((allCoins: Coin[]) => {
        if (allCoins.length > 10) {
          this.coins = allCoins.filter(
            (coin: Coin) =>
              !coin.is_new && coin.rank > 0 && coin.rank < 100
          );
        }
      }),
      takeUntil(this.destroy$))
}
Enter fullscreen mode Exit fullscreen mode

Because we are now not subscribing anymore, we can remove the takeUntil and let the caller handles the way it streams the data.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
  providedIn: 'root'
})
export class CoinsService {
  constructor(private httpClient: HttpClient) {}

  private coins: Coin[] = [];

  list(): Observable<Coin[]> {
    return this.httpClient
      .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
      .pipe(
        tap((allCoins: Coin[]) => {
          if (allCoins.length > 10) {
            this.coins = allCoins.filter(
              (coin: Coin) =>
                !coin.is_new && coin.rank > 0 && coin.rank < 100
            );
          }
        })
      );
  }

  getCoins(): Coin[] {
    return this.coins;
  }
}
Enter fullscreen mode Exit fullscreen mode

The code has already become cleaner, no more subscription and destroy lifecycle but, the code is still mixing different approaches. That’s why we take advantages of RxJS filter and map operators to make it more reactive.

list(): Observable<Coin[]> {
  return this.httpClient
    .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
    .pipe(
      filter((allCoins: Coin[]) => allCoins.length > 10),
      map((allCoins: Coin[]) =>
        allCoins.filter(
          (coin: Coin) =>
            !coin.is_new && coin.rank > 0 && coin.rank < 100
        )
      ),
      tap((topCoins: Coin[]) => (this.coins = topCoins))
    );
}
Enter fullscreen mode Exit fullscreen mode

The imperative if has become a reactive filter and the array.filter has been moved to a map transformer. Thanks to these last modifications the data sources flow through the stream that describe what we want as a results.


Step 2: Subscribe In Component

Even though the code still compiles, at this point no currencies are displayed anymore because no caller is using, is subscribing, to the stream.

As we are proceeding iteratively, we basically reproduce what we removed in the service earlier, we subscribe within the component.

import { Component, OnDestroy, OnInit } from '@angular/core';

import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { CoinsService } from '../coins.service';
@Component({
  selector: 'app-coins',
  templateUrl: './coins.component.html',
  styleUrls: ['./coins.component.css']
})
export class CoinsComponent implements OnInit, OnDestroy {
  constructor(private readonly coinsService: CoinsService) {}

  private destroy$: Subject<void> = new Subject<void>();

  ngOnInit(): void {
    this.coinsService
      .list()
      .pipe(takeUntil(this.destroy$))
      .subscribe(() => {});
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }

  get coins() {
    return this.coinsService.getCoins();
  }
}
Enter fullscreen mode Exit fullscreen mode

I know, I said “never subscribe”, this ain’t the end 😉. Nevertheless, we notice that the cryptos are listed again.


Step 3: Async Pipe

To reach our ultimate goal, we want to remove the subscription in the component in order to leverage the | async pipe. Therefore, we have to improve our service. On the other hand, we still want it to act as a store.

That’s why, as an intermediate step, we replace the imperative state coins of the service with a BehaviorSubject, a special type of Observable that allows values to be multicasted to many Observers (source), and exposes it streams publicly as a readonly Observable variable.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { BehaviorSubject, Observable } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
  providedIn: 'root'
})
export class CoinsService {
  constructor(private httpClient: HttpClient) {}

  private coins: BehaviorSubject<Coin[]> = new BehaviorSubject<
    Coin[]
  >([]);

  readonly coins$: Observable<Coin[]> = this.coins.asObservable();

  list(): Observable<Coin[]> {
    return this.httpClient
      .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
      .pipe(
        filter((allCoins: Coin[]) => allCoins.length > 10),
        map((allCoins: Coin[]) =>
          allCoins.filter(
            (coin: Coin) =>
              !coin.is_new && coin.rank > 0 && coin.rank < 100
          )
        ),
        tap((topCoins: Coin[]) => this.coins.next(topCoins))
      );
  }
}
Enter fullscreen mode Exit fullscreen mode

In comparison to our previous changes, this is breaking. That’s why we have to adapt the component to remove the getter and replace it with an observable we can ultimately use in the template.

import { Component, OnDestroy, OnInit } from '@angular/core';

import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

import { Coin, CoinsService } from '../coins.service';

@Component({
  selector: 'app-coins',
  templateUrl: './coins.component.html',
  styleUrls: ['./coins.component.css']
})
export class CoinsComponent implements OnInit, OnDestroy {
  constructor(private readonly coinsService: CoinsService) {}

  private destroy$: Subject<void> = new Subject<void>();

  coins$: Observable<Coin[]> = this.coinsService.coins$;

  ngOnInit(): void {
    this.coinsService
      .list()
      .pipe(takeUntil(this.destroy$))
      .subscribe(() => {});
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}
Enter fullscreen mode Exit fullscreen mode

Finally, we introduce the famous async pipe.

<article *ngFor="let coin of coins$ | async">
Enter fullscreen mode Exit fullscreen mode

Step 4: No Subscribe And Reactive

Our current solution is really close to comply with the goals, we are using a stream to get the data and to display the results but, we still have to subscribe to trigger the loading the currencies.

That’s why we try to remove the subject.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import { filter, map } from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
  providedIn: 'root'
})
export class CoinsService {
  constructor(private httpClient: HttpClient) {}

  readonly coins$: Observable<Coin[]> = ... // <- TODO

  list(): Observable<Coin[]> {
    return this.httpClient
      .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
      .pipe(
        filter((allCoins: Coin[]) => allCoins.length > 10),
        map((allCoins: Coin[]) =>
          allCoins.filter(
            (coin: Coin) =>
              !coin.is_new && coin.rank > 0 && coin.rank < 100
          )
        )
      );
  }
}
Enter fullscreen mode Exit fullscreen mode

We notice the exposed observable, coins$, is now lacking a source.

On the other hand, we still have the stream that process the flow of the data as we except.

Yes, that’s right, we connect both.

readonly coins$: Observable<Coin[]> = this.httpClient
  .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
  .pipe(
    filter((allCoins: Coin[]) => allCoins.length > 10),
    map((allCoins: Coin[]) =>
      allCoins.filter(
        (coin: Coin) =>
          !coin.is_new && coin.rank > 0 && coin.rank < 100
      )
    )
  );
Enter fullscreen mode Exit fullscreen mode

However, doing so, we do loose the state management feature we had in place thanks to the use of the BehaviorSubject. That’s why we introduce a shareReplay that will also replay values, that will also make our service acts as a store.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';

import { Observable } from 'rxjs';
import {filter, map, shareReplay} from 'rxjs/operators';

export type Coin = Record<string, string | number | boolean>;

@Injectable({
  providedIn: 'root'
})
export class CoinsService {
  constructor(private httpClient: HttpClient) {}

  readonly coins$: Observable<Coin[]> = this.httpClient
    .get<Coin[]>(`https://api.coinpaprika.com/v1/coins`)
    .pipe(
      filter((allCoins: Coin[]) => allCoins.length > 10),
      map((allCoins: Coin[]) =>
        allCoins.filter(
          (coin: Coin) =>
            !coin.is_new && coin.rank > 0 && coin.rank < 100
        )
      ),
      shareReplay({ bufferSize: 1, refCount: true })
    );
}
Enter fullscreen mode Exit fullscreen mode

If you never used shareReplay before, be careful when using it. Read more in the blog post of Kwinten Pisman.

Finally, we can remove our last subscription in the component and also all linked code that has for goal to handle the un-subscription.

import { Component } from '@angular/core';

import { Observable } from 'rxjs';

import { Coin, CoinsService } from '../coins.service';

@Component({
  selector: 'app-coins',
  templateUrl: './coins.component.html',
  styleUrls: ['./coins.component.css']
})
export class CoinsComponent {
  constructor(private readonly coinsService: CoinsService) {}

  readonly coins$: Observable<Coin[]> = this.coinsService.coins$;
}
Enter fullscreen mode Exit fullscreen mode

If you compare to its original version, has not the component become really slim and easy to understand?

A last check to the GUI.

All cryptos are still listed, the code is reactive and, we are not using any “subscribe” anymore 🥳.


Summary

Trying to not subscribe using RxJS in Angular is not a finality nor a strict rule but, when applied as a guideline, can help make the code cleaner and reactive, can help with the experience and time get better at RxJS.

To infinity and beyond!

David


You can reach me on Twitter or my website.

Give a try to DeckDeckGo for your next presentations!

DeckDeckGo

Discussion (4)

Collapse
christiankozalla profile image
Christian Kozalla

Starting with Angular and Rxjs I was struggling with exactly this concept. Mixing imperative and declarative code, being unsure when and where to subscribe..

I've read Josh's article earlier and I like the mental model comparing an observable stream to an assembly line 💡

What I'd like to take home from your article, David, are

  • Return observables from services
  • Process stream with rxjs operators inside service as much as possible
  • Subscribe to observable inside the template using the async pipe whenever possible

Thank you for these awesome articles👍

Collapse
daviddalbusco profile image
David Dal Busco Author

You read my mind Christian! Thank you for the summary, that's exactly what I tried to communicate 🙏.

Collapse
talr98 profile image
TalR98

Seems like more of preference choice. As long as you remember to unsubscribe with ngOnDestroy properly, all good

Collapse
daviddalbusco profile image
David Dal Busco Author • Edited

Well yes and no.

Agree that as long you unsubscribe correctly and avoid memory leaks, mixing both imperative and declarative works out too, no question about this.

My point is that programming imperatively with RxJS, as quoted in the article of Josh I am mentioning, goes against the grain and might be making things harder.

Therefore the idea that trying to never subscribe, slowly adapting to a reactive approach that fits better the RxJS concept and therefore makes ultimately things easier.