When it comes to RxJS, multicasting is probably one of its most daunting features. Multicasting requires a prior RxJS knowledge base and is, in itself, a feature which some programmers struggle to grasp. To this we add that there exist quite a few multicasting operators, and that knowing which one to pick can prove to be quite difficult if we don't understand how they each work, and which particular problem they solve.
This is an introductory guide to the RxJS multicasting operators. By the end of this guide, you'll (hopefully) understand these operators a little better, and know when and why you need to use each one. We will start by taking a look at the hot/cold Observable concept, and we'll then make our way through all of the multicasting operators, discussing their particularities and looking at examples, until we understand them all.
I've created a StackBlitz project for each and every one of the examples that we'll see in this guide, so that you can play around with the code snippets, since, in my humble opinion, this considerably helps to understand how the operators work. You can find the link to the corresponding StackBlitz project in the caption below each example.
Before we get started, here is a list of all the operators that we'll cover today:
Note: If you're already familiar with these concepts, feel free to skip this section and move on to the next.
Most Observables are cold by default. Each time we subscribe to a cold Observable, its producer is recreated. So what does this mean exactly? First, we must understand what a producer is: in a nutshell, it's the source of our Observable's values. It can be a DOM event, a callback, an HTTP request, an iterator and so on. In short, anything that can produce values and pass them to an observer.
Now that we know what a producer is, it'll be easier to understand the meaning of the previous statement, which basically comes to say that our Observable's producer is being created over and over again with each subscription. Let's take a look at an example:
As you can see, because our Observable is cold, and its producer is being recreated with each subscription, the side effect is being executed twice, one time for each subscription. If the Observable were hot, the side effect would be performed only once, regardless of how many times we subscribe.
Some may think, after seeing the previous code, that the consequences derived from this behavior aren't particularly important, which is why I always like to explain this concept, and its considerable importance, by using an HTTP request as an example.
Let's imagine that we have an Ajax Observable, that gets some data for us. Because the Ajax Observable is cold, each time we subscribe to it, a new HTTP request is made. Yes, you read correctly, a new request is made for each subscription. 20 subscriptions = 20 HTTP requests. Let's take a look at some code:
Upon seeing this, I believe that the importance of properly handling hot/cold Observables becomes exceedingly clear. Having the producer recreated with each subscription is something we most certainly don't want happening in our code. So, how do we fix this serious issue? By making our cold Observables hot. How do we do that? With multicasting operators! So, without further ado, let's get started on those operators.
Note: There is a wonderful article by Ben Lesh that treats the topic of hot/cold Observables in-depth. You can find it here.
multicast shares the source Observable by using a Subject. Let's take a look at an example using multicast:
Have you tried the StackBlitz? Noticed anything strange? If we run the previous code, we won't receive any values at all! Why isn't our source Observable emitting anything?
multicast returns a special kind of Observable: a
ConnectableObservable. This special type of Observable has a
connect() method, which, when called, is responsible for subscribing to the source Observable with the Subject that we provided.
This means that if we don't call
connect(), the source will never be subscribed to, and will never begin emitting values. So, let's change our previous code, adding a call to
Et voilà! Our code is now working as it should. Since
multicast is sharing the source Observable, the side effect will only be executed once, even if we were to subscribe 1000 times.
As with all Observables, it's important to unsubscribe from our multicast Observables to avoid memory leaks. We need to bear in mind that, when dealing with multicasting operators that return a ConnectableObservable, we need to unsubscribe from the multicast subscription.
Let's take our previous code snippet, remove the
take(2) from the source that was taking care of ending our Observable for us, and unsubscribe:
Memory leak successfully avoided!
In a perfect sandbox environment, all the subscriptions to the multicast Observable happen at the same time. However, what are the odds of finding this kind of behavior in real life? I can assure you that they are not very good. Let's take a look at a more realistic example, where we have different subscriptions occurring at different times:
Well, it seems that our late observer is missing out on the values that were emitted before it subscribed to the multicast Observable. This can lead to unexpected behavior and major headaches trying to discover the cause.
So, how can we solve this problem? It's actually quite simple, all we have to do is use a ReplaySuject instead of a regular Subject. Since ReplaySubjects replay old values to new subscribers, our issue is effectively solved:
Et, voilà, our late observers now have access to the previously emitted values.
We can all agree that
multicast is an amazing operator, but having to type
multicast(() => new Subject()) each time we want to multicast our streams can get a little verbose…
publish operator to the rescue!
publish basically uses
multicast plus a Subject under the hood, so that we don't have to go the trouble of typing it. Pretty cool, right? Let's take a look at an example:
Remember, we still have to call connect() if we want our source Observable to be subscribed to!
Remember the issue we had with late subscribers and
multicast? How do we deal with them in this case? Since publish is equivalent to using
multicast(() => new Subject()), we can't just change the Subject for a ReplaySubject manually. Luckily for us, publish has several variants, one for every kind of Subject that there is. Let's take a look at them:
publishReplay is equivalent to
multicast(() => new ReplaySubject()). Since we've already seen an example with
multicast + ReplaySubject, we know that it allows late subscribers to receive the emissions prior to their subscription:
publishLast is equivalent to
multicast(() => new AsyncSubject()). It will wait until the source Observable has completed to emit the last value. Here's an example:
publishBehavior is equivalent to
multicast(() => new BehaviorSubject()). Since it uses a BehaviorSubject,
publishBehavior allows us to specify an initial value:
We now know about several amazing operators to share our streams. However, having to call
connect() gets old quick. It's verbose and, should we forget to call it, we'd probably end up wasting time figuring out why our Observables aren't emitting. So, isn't there a better alternative?
Of course there is! Allow me to introduce the
refCount is in charge of counting the number of subscriptions to the source internally, which takes care of two crucial things for us:
- If the number of subscriptions is larger than 0, a.k.a. there's at least one subscriber,
refCountsubscribes (once only) to the source, calling
- If the number of subscriptions is smaller than 1, a.k.a. there aren't any subscribers,
refCountunsubscribes from the source.
Let's refactor our previous code to include
As you can see,
refCount takes care of calling
connect() and of unsubscribing from the source Observable for us.
Last, but not least, we have the
share operator, which is equivalent to using
multicast(() => new Subject()) +
refCount. It's the easiest and most commonly used multicasting operator, since it takes care of everything under the hood. Here's our previous example, refactored to use
Just for fun, let's take a look at a slightly more realistic example, featuring a shared Ajax Observable instead of a boring old interval:
Once again, we must remember our late subscribers. In this case, share only has one variant,
shareReplay. As you can imagine, shareReplay is equivalent to
multicast(() => new ReplaySubject()) +
refCount. Here's an example:
publish is equivalent to
multicast(() => new Subject()).
publishBehavior is equivalent to
multicast(() => new BehaviorSubject()).
publishLast is equivalent to
multicast(() => new AsyncSubject()).
publishReplay is equivalent to
multicast(() => new ReplaySubject()).
refCount, we no longer have to manually call
connect() nor do we have to take care of unsubscribing.
share is equivalent to
multicast(() => new Subject()),
shareReplay is equivalent to
multicast(() => new ReplaySubject()),
That's all folks! I hope this post helped you understand multicasting operators a little better, and realize that multicasting is not as hard as it initially seems.
As I mentioned earlier, I've created a StackBlitz project for each and every example in this guide, so feel free to play around with the code, I promise that it really helps to better understand how the operators work.
If you enjoyed this guide, feel free to leave a nice comment! If you have any questions, you can drop them in the comments and I'll try my best to answer.