loading...
Cover image for Understanding Multicasting Observables in Angular
Bitovi

Understanding Multicasting Observables in Angular

likeomgitsfeday profile image Jennifer Wadella Updated on ・6 min read

Many times in Angular application development we'll have an Observable, and want to use the values from that Observable to do different things in the UI.

Let's imagine we're building this interface that shows information about a fish, and we want to show users a schedule of when that fish is available based on the hemisphere of the world selected.

Fish Display UI

In our component we'll get the response of an HTTP request to the animal crossing API. We're using HTTPClient which returns an Observable. We want to display data from that HTTP request in our UI so a user can see information about the fish, but we also want to display a custom built schedule based on that data and input from something else.

The API returns an object that looks something like this:

{
   "id":10,
   "fileName":"killifish",
   "name":{
      "name-en":"killifish",
      ...
   },
   "availability":{
      "month-northern":"4-8",
      "month-southern":"10-2",
      "time":"",
      "isAllDay":true,
      "isAllYear":false,
      "location":"Pond",
      "rarity":"Common"
   },
   "shadow":"Smallest (1)",
   "price":300,
   "priceCj":450,
   "catchPhrase":"I caught a killifish! The streams are safe again.",
   "museumPhrase":"Ah, the magnificent killifish! Did you know there are over 1,000 different species? My own favorite killifish species are the mangrove, which can live on land for weeks, breathing air! And the mummichog, the first fish ever brought to space. I wonder if the killifish you've brought me is related to either those intrepid explorers?",
}

We want to get the availability based on the hemisphere(northern or southern) the user cares about, and display the months during which that fish is available, by creating an array like this:

[
   {
      "month":"January",
      "available":false
   },
   {
      "month":"February",
      "available":true
   },
   ...
]

We might consider doing something like this (note we are using the Async pipe in our component template to subscribe to fish$):

// fish.component.ts 

  public fish$: Observable<Fish &{uiSchedule: Schedule}> ;
  public selectedHemi = new BehaviorSubject<'northern' | 'southern'>('northern');
  public displayedSchedule$: Observable<Month[]>;

  constructor(private route: ActivatedRoute, private acnhService: AcnhService) { }

ngOnInit(): void {
    this.fish$ = this.route.paramMap.pipe(
      switchMap((params: ParamMap) => {
        return this.acnhService.getFish(params.get('id')).pipe(map((res: Fish) => {
          return {
            ...res,
            uiSchedule: {
              // mapping function to generate array of months with key of 
              // whether month is available or not
              northern: buildSchedule(res.availability, 'northern'),
              southern: buildSchedule(res.availability, 'southern')
            }
          }
        }));
      }),
    )

    this.displayedSchedule$ = this.selectedHemi.pipe(
      withLatestFrom(this.fish$),
      map(([selectedHemi, fish]) => {
        return fish.uiSchedule[selectedHemi];
      })
    )
  }
// fish.component.html 
<mat-card *ngIf="fish$ | async as fish" color="secondary">
  <mat-card-header>
    <mat-card-title>{{fish.name['name-en']}}</mat-card-title>
    <mat-card-subtitle>{{fish.price | currency }}</mat-card-subtitle>
  </mat-card-header>
  <mat-card-content class="row">
    <div>
      <img src="{{fish.imageUrl}}" alt="{{fish.name['name-en']}}">
      <blockquote class="museum-phrase">"{{fish.museumPhrase}}"</blockquote>
    </div>
    <div>
      <mat-button-toggle-group name="hemisphere" [value]="selectedHemi | async" aria-label="Hemisphere" color="primary" (change)="selectedHemi.next($event.value)">
        <mat-button-toggle value="northern">Northern Hemisphere</mat-button-toggle>
        <mat-button-toggle value="southern">Southern Hemisphere</mat-button-toggle>
      </mat-button-toggle-group>
      <div class="table display-availability">
        <div class="month" *ngFor="let month of displayedSchedule$ | async"  [ngClass]="{'available':month.available}">
          {{month.month}}
        </div>
      </div>
      <div *ngIf="fish.availability.isAllDay;else limitedHours">
        <p>The {{fish.name['name-en']}} is available at all times</p>
      </div>
      <ng-template #limitedHours>
        <p>The {{fish.name['name-en']}} is available from {{fish.availability.time}}</p>
      </ng-template>
    </div>
  </mat-card-content>
</mat-card>

This will give us a displayedSchedule$ Observable with an array that displays either the northern or southern hemisphere schedule when the value of selectedHemi changes. Again, assume that we're using the Async pipe in our template to subscribe to this Observable because we want the tear down functionality of our Angular component to handle unsubscribing for us.

But by doing this we're creating an additional subscription to fish$ when we subscribe to displayedSchedules, which means our Observable is being executed twice, quite unnecessarily. Not to mention rude, this awesome developer built a great free API indexing Animal Crossing stuff, and we're thoughtlessly hitting it twice? Ruuuuuude. (ps. how many of ya'll have been doing something like this without even realizing?)

How can we avoid this?

Instead of an Observable, we can use a Subject instead. Subjects can have multiple subscribers and only execute their context once. To convert an Observable to a Subject we can use the multicast operator.

The multicast operator is a bit of a bear to understand - it takes a selector as a parameter and according to the docs returns

"An Observable that emits the results of invoking the selector on the
items emitted by a ConnectableObservable that shares a single
subscription to the underlying stream."

A more palatable summary from the docs is

"A multicasted Observable uses a Subject under the hood to make multiple Observers see the same Observable execution. Multicast returns an Observable that looks like a normal Observable, but works like a Subject when it comes to subscribing. multicast returns a ConnectableObservable, which is simply an Observable with the connect() method.

The connect() method is important to determine exactly when the shared Observable execution will start. Because connect() does source.subscribe(subject) under the hood, connect() returns a Subscription, which you can unsubscribe from in order to cancel the shared Observable execution.

So let's pipe the multicast operator to source Observable fish$ with a new ReplaySubject (because we want late subscribers to get the value).


On the Subject of Subjects ...

subject - a special type of Observable that allows values to be multicasted to many Observers

behaviorSubject - a subject that can 'store' a current value that new subscribers will receive

replaySubject - a subject than can send old values to new subscribers


 this.fish$ = this.route.paramMap.pipe(
      switchMap((params: ParamMap) => {
        return this.acnhService.getFish(params.get('id')).pipe(map((res: Fish) => {
          return {
            ...res,
            uiSchedule: {
              northern: buildSchedule(res.availability, 'northern'),
              southern: buildSchedule(res.availability, 'southern')
            }
          }
        }));
      }),
      multicast(new ReplaySubject(1))
    )

... now we have nothing displaying in our UI? Why? We still have the async pipe subscribing to fish$, but fish$ is now a ConnectableObservable, and we must call the connect method on it to trigger our source observables execution.

// RxJS source code

function Multicast() {
   ...
   return <ConnectableObservable<R>> connectable;
}

export class ConnectableObservable<T> extends Observable<T>{
   ...  
   connect(): Subscription {
    let connection = this._connection;
    if (!connection) {
      this._isComplete = false;
      connection = this._connection = new Subscription();
      connection.add(this.source
        .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
      if (connection.closed) {
        this._connection = null;
        connection = Subscription.EMPTY;
      }
    }
    return connection;
  }

  refCount(): Observable<T> {
    return higherOrderRefCount()(this) as Observable<T>;
  }
  ...
}
this.fish$.connect()

However, this means we must also remember to unsubscribe from that subscription created by the connect method, so doesn't that defeat the purpose of using the async pipe? Yep. Boo. BUT, fear not, gentle reader, we can use the refCount operator, instead of having to manage the connect method ourselves.

refCount makes the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.

RefCount returns an Observable that keeps track of how many subscribers it has, it will start executing when subscribers is more than 0, and stops when subscribers are 0 again. This means when we use our async pipe on fish$, the count will become 1, when we use our async pipe on displayedSchedule$ the count will become 2 and when our component is destroyed and the async pipes unsubscribe, the count will go back to 0.

Our final code looks something like this

    this.fish$ = this.route.paramMap.pipe(
      switchMap((params: ParamMap) => {
        return this.acnhService.getFish(params.get('id')).pipe(map((res: Fish) => {
          return {
            ...res,
            uiSchedule: {
              northern: buildSchedule(res.availability, 'northern'),
              southern: buildSchedule(res.availability, 'southern')
            }
          }
        }));
      }),
      multicast(new ReplaySubject(1)),
      refCount()
    )

In summary, when we have an Observable we'd like to use a source for various purposes without executing its context every time, we can use the multicast operator to take a Subject and use it to share the source execution of our source Observable. The multicast operator returns a ConnectableObservable type, on which we CAN use the connect method to create the subscription to our source Observable(the HTTP request to get a fish). A more manageable approach is to use the refCount operator which will count subscriptions and call the connect method to subscribe to the source Observable once the first subscription is created and run tear down logic when the subscription count returns to 0 (AKA all the subscriptions have been unsubscribed).

Posted on by:

likeomgitsfeday profile

Jennifer Wadella

@likeomgitsfeday

Force of nature. Angular lead @ Bitovi. International & keynote tech speaker. Foodie. #kcnative. Community organizer and WiT advocate.

Bitovi

Business requirements go in — High-quality software comes out. Fortune 5 and startups trust us to make their life easier. Senior UX and JavaScript experts.

Discussion

pic
Editor guide