DEV Community

Cover image for RxJS Subjects in Depth
Giancarlo Buomprisco
Giancarlo Buomprisco

Posted on • Originally published at blog.bitsrc.io

RxJS Subjects in Depth

RxJS Subjects in Depth

This article was originally published on Bits and Pieces by Giancarlo Buomprisco

In this article, I want to explore the topic of RxJS’s implementation of Subjects, a utility that is increasingly getting awareness and love from the community.

In the past, I have used Subjects in a variety of ways, but sometimes not fully understanding what they are internally and what are the main differences with Observables.

This is what the article will cover:

  • What is a Subject?

  • Multicasting and Unicasting

  • Other types of Subject: AsyncSubject, ReplaySubject, and BehaviorSubject

What is a Subject?

Let’s start with a simple question: what is a Subject?
According to Rx’s website:

A Subject is a special type of Observable that allows values to be multicasted to many Observers.

Subjects are like EventEmitters.

If this is unclear, hang on, by the end of the article you’ll have a much clearer understanding of what a Subject is and all the way you can make use of them.

The definition stated by the Rx documentation initially struck me: in fact, I always thought of Subjects as purely a way to both pull and push values using streams. It turns out, I didn’t really know them full well even after having used them daily for about 5 years.

Tip: Easily reuse Angular/React/Vue components across your projects with Bit

Use Bit to share and reuse JS modules and React/Angular/Vue components across different projects. Collaborate over shared components as a team to build apps faster together. Let Bit do the heavy lifting so you can easily publish, install and update your individual components without any overhead. Click here to learn more.

Subject

Subject is a class that internally extends Observable. A Subject is both an Observable and an Observer that allows values to be multicasted to many Observers, unlike Observables, where each subscriber owns an independent execution of the Observable.

That means:

  • you can subscribe to a Subject to pull values from its stream

  • you can feed values to the stream by calling the method next()

  • you can even pass a Subject as an Observer to an Observable: as mentioned above, a Subject is also an Observer, and as a such, it implements the methods next, error and complete

Let’s see a quick example:

    const subject$ = new Subject();

    // Pull values
    subject$.subscribe(
      console.log, 
      null, 
      () => console.log('Complete!')
    );

    // Push values
    subject$.next('Hello World');

    // Use Subject as an Observer
    const numbers$ = of(1, 2, 3);
    numbers$.subscribe(subject$);

    /* Output below */

    // Hello Word
    // 1
    // 2
    // 3
    // Complete!
Enter fullscreen mode Exit fullscreen mode

The Internals of a Subject

Internally, every Subject maintains a registry (as an array) of observers. This is how internally a Subject works, in a nutshell:

  • every time a new observer subscribes, the Subject will store the observer in the observers' array

  • when a new item is emitted (i.e. the method next() was called), the Subject will loop through the observers and emit the same value to each one of them (multicasting). The same will happen when it errors or completes

  • when a Subject completes, all the observers will be automatically unsubscribed

  • when a Subject is unsubscribed, instead, the subscriptions will still be alive. The observers’ array is nullified, but it doesn’t unsubscribe them. If you attempt to emit a value from an unsubscribed Subject, it will actually throw an error. The best course of action should be to complete your subjects when you need to dispose of them and their observers

  • when one of the observers is unsubscribed, it will then be removed from the registry

Multicasting

Passing a Subject as an Observer allows to convert the Observable’s behavior from unicast to multicast. Using a Subject is, indeed, the only way to make an Observable multicast, which means they will share the same execution with multiple Observers.

Wait, though: what does sharing execution actually mean? Let’s see two examples to understand the concept better.

Let’s use the observable interval as an example: we want to create an observable that emits every 1000 ms (1 second), and we want to share the execution with all the subscribers, regardless of when they subscribed.


    const subject$ = new Subject<number>();

    const observer = {
      next: console.log
    };

    const observable$ = interval(1000);

    // subscribe after 1 second
    setTimeout(() => {
      console.log("Subscribing first observer");    
      subject$.subscribe(observer);
    }, 1000);

    // subscribe after 2 seconds
    setTimeout(() => {
      console.log("Subscribing second observer");
      subject$.subscribe(observer);
    }, 2000);

    // subscribe using subject$ as an observer
    observable$.subscribe(subject$);
Enter fullscreen mode Exit fullscreen mode

Let’s summarize the snippet above

  • we create a subject called subject$ and an observer which simply logs the current value after each emission

  • we create an observable that emits every 1 second (using interval)

  • we subscribe respectively after 1 and 2 seconds

  • finally, we use the subject as an observer and subscribe to the interval observable

Let’s see the output:

As you can see in the image above, even if the second observable subscribed after 1 second, the values emitted to the 2 observers are exactly the same. Indeed, they share the same source observable.

Another common example that shows the usefulness of multicasting is subscribing to an observable that executes an HTTP request, a scenario that happens often in frameworks such as Angular: by multicasting the observable, you can avoid executing multiple requests and share the execution with multiple subscribers, that will receive the same value.

AsyncSubject

I personally find AsyncSubject the least known type of Subject, simply because I never really needed it, or more likely I didn’t know I could have needed it.

In short, the AsyncSubject will:

  • emit only once it completes

  • emit only the latest value it received


    const asyncSubject$ = new AsyncSubject();

    asyncSubject$.next(1);
    asyncSubject$.next(2);
    asyncSubject$.next(3);

    asyncSubject$.subscribe(console.log);

    // ... nothing happening!

    asyncSubject$.complete();

    // 3
Enter fullscreen mode Exit fullscreen mode

As you can see, even if we subscribed, nothing happened until we called the method complete.

ReplaySubject

Before introducing a ReplaySubject, let’s see a common situation where normal Subjects are used:

  • we create a subject

  • somewhere in our app, we start pushing values to the subject, but there’s no subscriber yet

  • at some point, the first observer subscribes

  • we expect the observer to emit the values (all of them? or only the last one?) that were previously pushed through the subject

  • … nothing happening! In fact, a Subject has no memory


    const subject$ = new Subject();

    // somewhere else in our app
    subject.next(/* value */);

    // somewhere in our app
    subject$.subscribe(/* do something */);

    // nothing happening
Enter fullscreen mode Exit fullscreen mode

This is one of the situations when a ReplaySubject can help us: in fact, a Subject will record the values emitted and will push to the observer all the values emitted when a subscribed.

Let’s get back to the question above: does a ReplaySubject replay all the emissions or only the latest one?

Well, by default, the subject will replay all the items emitted, but we can provide an argument called bufferSize. This argument defines the number of emissions that the ReplaySubject should keep in its memory:


    const subject$ = new ReplaySubject(1);

    subject$.next(1);
    subject$.next(2);
    subject$.next(3);

    subject$.subscribe(console.log);

    // Output
    // 3
Enter fullscreen mode Exit fullscreen mode

There’s also a second argument that can be passed to ReplaySubject in order to define how long the old values should be stored in memory.


    const subject$ = new ReplaySubject(100,*250);

    setTimeout(() => subject$.next(1), 50);
    setTimeout(() => subject$.next(2), 100);
    setTimeout(() => subject$.next(3), 150);
    setTimeout(() => subject$.next(4), 200);
    setTimeout(() => subject$.next(5), 250);

    setTimeout(() => {
      subject$.subscribe(v => console.log('SUBCRIPTION A', v));
    }, 200);

    setTimeout(() => {
      subject$.subscribe(v => console.log('SUBCRIPTION B', v));
    }, 400);
Enter fullscreen mode Exit fullscreen mode
  • we create a ReplaySubject whose bufferSize is 100 and windowTime 250

  • we emit 5 values every 50ms

  • we subscribe the first time after 200ms and the second time after 400ms

Let’s analyze the output:


    SUBCRIPTION A 1
    SUBCRIPTION A 2
    SUBCRIPTION A 3
    SUBCRIPTION A 4
    SUBCRIPTION A 5
    SUBCRIPTION B 4
    SUBCRIPTION B 5
Enter fullscreen mode Exit fullscreen mode

The subscription A was able to replay all the items, but the subscription B was only able to replay items 4 and 5, as they were the only ones emitted within the window time specified.

BehaviorSubject

BehaviorSubject is probably the most well-known subclass of Subject. This kind of Subject represents the “current value”.

Interestingly, the Combine framework named it CurrentValueSubject

Similarly to ReplaySubject, it will also replay the current value whenever an observer subscribes to it.

In order to use BehaviorSubject we need to provide a mandatory initial value when this gets instantiated.

    const subject$ = new BehaviorSubject(0); // 0 is the initial value

    subject$.next(1);

    setTimeout(() => {
      subject$.subscribe(console.log);
    }, 200);

    // 1
Enter fullscreen mode Exit fullscreen mode

Whenever a new value is emitted, the BehaviorSubject will store the value in the property value which can also be publicly accessed.

Final Words

Rx Subjects are quite powerful tools, and like any powerful tool in software engineering, they can also be easily abused. The concept of unicasting and multicasting is a striking distinction that you need to take into account when working with Rx.

Understanding how Subjects work internally can be fairly beneficial to avoid common pitfalls and bugs, but also to understand when you need them and when, instead, you don’t.

If you need any clarifications, or if you think something is unclear or wrong, do please leave a comment!

I hope you enjoyed this article! If you did, follow me on* Medium, Twitter or my website for more articles about Software Development, Front End, RxJS, Typescript and more!

Top comments (4)

Collapse
 
yoni333 profile image
yehonatan yehezkel

What will you use for a case you need to push values - but slso you need it unicasting?

Collapse
 
gc_psk profile image
Giancarlo Buomprisco

Hi Yehonatan, interesting question!

Do you have a use-case in mind? Without knowing exactly the scenario, I think you don't need a Subject in such case, you should create an Observable creator and push values from the inside?

Collapse
 
yoni333 profile image
yehonatan yehezkel • Edited

Etc in angular when the observablenis in a service but you push values from the component after user clicks.
.
what is the best practice to push values from inside into observable?

Thread Thread
 
gc_psk profile image
Giancarlo Buomprisco

The only way for that is to create a new Observable using new Observable(Observer) - where you define the source inside the observable and you push values/errors/completion with Observer.next, error and complete. Hope this helps!