Update
Shout out to Andrei Gatej for an additional improvement that I've added as a final iteration below.
Let me start off by saying that there's always going to be exceptions and you should not refactor your current Angular source code. Rather take into consideration my notes and continue improving your style.
Observables and Subscriptions are a big part of the Angular 2+ web framework. The theory behind it can be a bit confusing at first and many Angular developers avoid Observables unless absolutely necessary. In my own experience, I've encountered production code bases where Observables were avoided like the plague and badly coerced into behaving in incorrect ways. This lead to memory-intensive applications that ran like a turtle after eating at a buffet.
Learning more about the RxJs API and Observables can lead to leaner applications that express and achieve a lot more in less code. Also, the Angular team recommends the smart use of Observables where possible.
Rather than go into an abstract discussion about Observables and Streams and Functional Programming and all that jazz, I'm going to illustrate a common mistake made by many developers when starting out with RxJs and Angular 2+. The first example implements a naive implementation of a numeric counter.
Below is a naive implementation which demonstrates quite a few initial misconceptions of RxJs and Angular. If you've been working with Angular for a while, you've probably encountered code similar to this, maybe even written it yourself. Don't worry, this is a no shame zone.
naive-counter.service.ts
import { Injectable } from "@angular/core";
import { ReplaySubject } from "rxjs";
import { first } from "rxjs/operators";
@Injectable()
export class NaiveCounterService {
counter$ = new ReplaySubject<number>(1);
constructor() {
this.counter$.next(0);
}
increment(): void {
this.counter$
.pipe(first())
.subscribe((counter: number) => this.counter$.next(counter + 1));
}
decrement(): void {
this.counter$
.pipe(first())
.subscribe((counter: number) => this.counter$.next(counter - 1));
}
}
naive-counter.component.ts
import { Component, OnInit } from "@angular/core";
import { NaiveCounterService } from "./naive-counter.service";
@Component({
selector: "app-naive-counter",
template: `
Counter: <strong>{{ counter }}</strong>
<button (click)="naiveCounterService.increment()">Increment</button>
<button (click)="naiveCounterService.decrement()">Decrement</button>
`
})
export class NaiveCounterComponent implements OnInit {
counter: number;
constructor(readonly naiveCounterService: NaiveCounterService) {}
ngOnInit(): void {
this.naiveCounterService.counter$.subscribe(
(counter: number) => (this.counter = counter)
);
}
}
Let's identify every area of improvement. Biggest red flag, in my opinion, is the fact that we have a subscription which is not being disposed of when the component unmounts. This leads to memory leaks as these subscriptions could potentially continue running until the web application navigates away. Even worse, if the component is mounted and unmounted repeatedly, that's an additional, long running subscription added per life-cycle.
The simples way to solve this issue, and perhaps the most pragmatic, is to bind the observable itself to the template using the async pipe. Let's refactor.
naive-counter.component.ts
import { Component } from "@angular/core";
import { NaiveCounterService } from "./naive-counter.service";
@Component({
selector: "app-naive-counter",
template: `
Counter: <strong>{{ naiveCounterService.counter$ | async }}</strong>
<button (click)="naiveCounterService.increment()">Increment</button>
<button (click)="naiveCounterService.decrement()">Decrement</button>
`
})
export class NaiveCounterComponent {
constructor(readonly naiveCounterService: NaiveCounterService) {}
}
With this improvement, the responsibility over managing subscriptions is delegated from the developers to "Angular", or the Angular developers. The async pipe, as demonstrated above, subscribes to the observable and updates the UI with each notification received. Once the component unmounts, the async pipe unsubscribes from the observable automatically. The risk of long living subscriptions has been removed.
The above design illustrates a common pattern in Angular to provide long-living state for a component, or series of components, that will mount and unmount repeatedly throughout the application's usage. You should definitely not be creating long-living state where you don't really need it as this is the main cause of memory consumption over time.
The next step to improving this design is in the service. Here it is once again.
naive-counter.service.ts
import { Injectable } from "@angular/core";
import { ReplaySubject } from "rxjs";
import { first } from "rxjs/operators";
@Injectable()
export class NaiveCounterService {
counter$ = new ReplaySubject<number>(1);
constructor() {
this.counter$.next(0);
}
increment(): void {
this.counter$
.pipe(first())
.subscribe((counter: number) => this.counter$.next(counter + 1));
}
decrement(): void {
this.counter$
.pipe(first())
.subscribe((counter: number) => this.counter$.next(counter - 1));
}
}
The first issue to address is the increment()
and decrement()
methods. The original implementation attempts to read the last emitted notification by subscribing with a first()
pipe which completes the observable once a single notification is received. Further, this implementation is relying on a ReplaySubject
in order to receive a cached value immediately after subscribing.
The simplest solution is to replace the ReplaySubject
with a BehaviorSubject
which includes an initial value and a getValue
method that returns the cached value synchronously. This solution, however, misses the point.
The current design doesn't consider the calling of the increment()
and decrement()
methods as notifications that affect the stream. For this reason, the calculation of the next notification falls outside of the stream. The stream is itself only being used to push a value to its observers. The best approach is to replace the ReplaySubject
with a multi-casting observable.
The publishBehavior(initialNotification)
operator will convert the observable into a ConnectableObservable
. This ConnectableObservable
will begin emitting notifications when its connect()
method is called. To avoid this additional step, the refCount()
operator can be added right after which will begin emitting notifications as soon as a single observer subscribes and will unsubscribe from the source if no observers remain subscribed. Further, it will re-subscribe to the source if a new observer subscribes and the source is has not finalized.
With a bit of refactoring,
naive-counter.service.ts
import { Injectable } from "@angular/core";
import { Subject } from 'rxjs';
import { scan, publishBehavior, refCount } from "rxjs/operators";
@Injectable()
export class NaiveCounterService {
private change$ = new Subject<number>();
readonly counter$ = this.change$.pipe(
scan((counter: number, change: number) => counter + change),
publishBehavior(0),
refCount()
);
increment(): void {
this.change$.next(1);
}
decrement(): void {
this.change$.next(-1);
}
}
The above design has replaced the ReplaySubject
with a ConnectableObservable
that behaves similar to a BehaviorSubject
in that it has an initial notification, specifically 0
, and the last notification is immediately emitted to each new subscriber. Subscriptions are managed automatically by the refCount()
pipe. Yet, it receives all consecutive values from the observable stream.
The increment()
and decrement()
methods now act as proxy methods for dispatching notifications on the change$
stream which is an Subject
. Finally, the state of the counter
is calculated with each notification from change$
and the previously emitted notification.
One problem that might arise from this design is caused by the refCount()
operator. If all observers unsubscribe, the subject will unsubscribe from the source. This is fine in most cases but one caveat is that there wont be a cached value for a new observer. The next observer to subscribe will have to wait until the source emits a new notification to receive its first notification. This could be a problem if the UI is trying to mount again with the last notification as its current state.
A BehaviorSubject
and the asObservable()
method provide an even better solution. Here is the latest iteration:
naive-counter.service.ts
import { Injectable } from "@angular/core";
import { scan } from "rxjs/operators";
import { BehaviorSubject } from "rxjs";
@Injectable()
export class NaiveCounterService {
private readonly change$ = new BehaviorSubject<number>(0);
readonly counter$ = this.change$
.asObservable()
.pipe(scan((counter: number, change: number) => counter + change));
increment(): void {
this.change$.next(1);
}
decrement(): void {
this.change$.next(-1);
}
}
The above improvement relies on the BehaviorSubject
to receive and cache new notifications. The asObservable()
method returns an observable that conceals the BehaviorSubject
so that external actors cannot push notifications into the stream. The stream logic is attached to the returned observable.
Top comments (1)
Interesting approach!
One thing that seems a little counterintuitive to me is that fact that EventEmitter is used for something else that its inherent purpose: emitting events for parent components.
Otherwise, I agree that the stream logic should be computed through pipeable operators.
I think you can achieve the same by using a BehaviorSubject and exposing it as an observable: