loading...

A Brief Introduction To RxJS

sadarshannaiynar profile image Adarsh Updated on ・13 min read

ReactiveXJS or also known as RxJS is an asynchronous programming API that's built on the principles of Reactive Programming. It does so by borrowing concepts from functional programming and design patterns like the Observer, Publisher-Subscriber, Broadcast etc.

What is Reactive Programming?

Reactive Programming is a paradigm like the Object Oriented Programming that emphasizes on the fact that our code should react whenever there is a change in the input values. This paradigm is useful in handling huge volumes of asynchronous data such as streams, events and propagation of change that happens when an event is triggered.

You might be thinking "Reactive programming sounds a lot like what we already do in when we try to code asynchronous tasks". Yes it does but the code we write to manage the asynchronous calls can get quite messy and error prone as the application scales up. A piece of asynchronous code can quickly turn into a nightmare for developers when it's not handled properly.

Reactive Programming lays out practices and principles that allows developers to manage the asynchronous calls while making the code more readable, pure and less error prone. The reactive programming principle was introduced back in the 1960s and in 2014 the Reactive Manifesto was published which laid out the need and principles of Reactive Programming.

RxJS: An Introduction

RxJS is a JavaScript library that provides APIs to write reactive code in JavaScript.

Mentioned below are the terms that we will be using frequently,

Observable: An entity that our observer/subscriber monitors and reacts to. An example would be a data stream or sequence of events.

Subscriber: An entity that reacts to the values emitted from the oberservable.

Subjects: These are special class of observables that also is an subscriber and allows multiple subscribers to be subscribed to it. It's similar to the broadcast model.

Operators: Functions that allows to create, manage and transform observables. Some examples are map, filter, reduce etc.

Marble Diagrams

To allow us to better visualize the way data flows and changes through the RxJS API we will use "Marble Diagrams".

Marble Diagram 1

In the above diagram, right headed arrow represents time duration of the data stream which keeps increasing that is going from past to future. The circles/marbles represent the events that occur at a particular point in time. The vertical bar '|' represents the end of the stream and a successful completion while an 'X' signifies an error which occurred in the stream and an termination of execution.

The block represents the function which is being applied on the stream this function gets invoked whenever there is a new element emitted by the stream. The return type of this function is also a stream so we can chain multiple functions one after the other.

Observable

Observable are objects that allows an subscriber to be subscribed to it and emits values for the subscriber to act. In a reactive programming environment the observable is responsible for pushing changes and data to the subscribers.

There are generally two types of observables:

  1. Hot: Starts emitting values as soon as it is created
  2. Cold: Starts emitting values only after an subscriber is subscribed to it.

Let's take a look the following code

const { Observable } = require('rxjs');

const subscribe = (subscriber) => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.next(5);
  subscriber.next(true);
  subscriber.complete();
};

const observable = new Observable(subscribe);

const subscriber = {
  next: (value) => console.log(`Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

observable.subscribe(subscriber);
console.log('Subscribed');

The above code produces the following output:

Emitted: Hello
Emitted: World
Emitted: 5
Emitted: true
End of the stream
Subscribed

First we create an Observable by using the Observable constructor which takes a subscribe function as the parameter. This subscribe function itself has a parameter called subscriber. The subscriber comprises of 3 methods next, complete and error. The next method is used to emit the value to the subscriber, complete function is used to signal that we have reached the end and error function is used to process any error that is thrown by the observable.

If you notice we emit values of multiple data types. In the above instance the first two values we emit are strings, the third value is a number and the final value is a boolean. So an observable is not bound to any certain data type.

We then created a subscriber object which subscribes to the Observable instance to process the emitted values. The subscribe methods of an Observable instance takes an object as a parameter. The object again consists of three key, value pairs next, complete and error. The values for the keys are functions.

The functions subscriber.next() and subscriber.complete() in the observable instance merely executes the code we have written for the next and completed keys in our subscriber object.

Now let's simulate an error in the stream.

const { Observable } = require('rxjs');

const subscribe = (subscriber) => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.error(new Error('Some error in stream'));
  subscriber.next(5);
  subscriber.next(true);
};

const observable = new Observable(subscribe);

const subscriber = {
  next: (value) => console.log(`Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
  error: (err) => console.error(`${err}`),
};

observable.subscribe(subscriber);
console.log('Subscribed');

The above code will produce the following output:

Emitted: Hello
Emitted: World
Error: Some error in stream
Subscribed

Note that both 5 and true are missing from the output this is because once the subscriber encounters an error or complete signal it will stop processing the stream.

Whatever we wrote above is actually synchronous and not asynchronous. Let's modify it a little bit to make it asynchronous.

const { Observable } = require('rxjs');

const subscribe = (subscriber) => {
  setTimeout(() => subscriber.next('Hello'), 1000);
  setTimeout(() => subscriber.next('World'), 3000);
  setTimeout(() => subscriber.next(5), 5000);
  setTimeout(() => subscriber.next(true), 7000);
  setTimeout(() => subscriber.complete(), 9000);
};

const observable = new Observable(subscribe);

const subscriber = {
  next: (value) => console.log(`Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

observable.subscribe(subscriber);
console.log('Subscribed');

Now we will get the following output with an interval of 2 seconds before printing each line after the first line.

Subscribed
Emitted: Hello
Emitted: World
Emitted: 5
Emitted: true
End of the stream

If you notice the 'Subscribed' gets printed on to the console first because in our observable we emit values after a certain duration. Every time the subscriber.next() is called in the setTimeout it triggers the next function of our subscriber and prints the value finally we trigger subscriber.complete().

Last but not least we can create an observable in many ways, more details can be found here.

Congratulations, we have written our first code in RxJS. Let's go ahead the explore other features of the library.

RxJS Operators

Operators in RxJS are pure functions that transforms, process and manages the data that it receives from the observable. For example, map, filter, forEach etc. are some operators.

const { from } = require('rxjs');
const { map, take, filter } = require('rxjs/operators');

function* generateUserEverySecond() {
  previous = Date.now()
  while (true) {
    if (Date.now() - previous >= 1000) {
      previous = Date.now();
      yield {
        createdDate: Date.now(),
        updateDate: Date.now(),
        userId: Math.floor(Math.random() * 100000),
        numberOfLikes: Math.floor(Math.random() * 100),
      };
    }
  }
}

const observable = from(generateUserEverySecond())
  .pipe(
    map((x) => ({ user: x.userId, likes: x.numberOfLikes })),
    filter((x) => x.likes > 80),
    take(5)
  );

const subscriber = {
  next: (value) => console.log(`Emitted: ${JSON.stringify(value)}`),
  complete: () => console.log('End of the stream'),
};

observable.subscribe(subscriber);

Output:

Emitted: {"user":48219,"likes":93}
Emitted: {"user":7996,"likes":90}
Emitted: {"user":39907,"likes":82}
Emitted: {"user":53731,"likes":96}
Emitted: {"user":53499,"likes":84}
End of the stream

The from function is used to convert an Promise, Iterable or an Array into an Observable. The generateUserEverySecond generator will yield an object every second.

We have used the pipe function to apply the operators and each of the operators will be called whenever it encounters a new data. The pipe function kind of acts an pipeline and each operator can be thought of a gates in the pipeline that control how the data flows and changes across the pipeline.

We have applied three operators on our data map, filter and take function. The map functions transforms the input object into another object and produces it as the input to the next operator. The filter function picks only those objects which have likes greater than 80. Finally the take function tells the observable to signal complete after we obtain 5 objects as the output meaning our stream will end after the filter operator produces 5 objects.

Let's visualize our code with a marble diagram to see the data flow and changes.

Marble Diagram 2

Tip 1: Before writing a reactive code try to chart out a marble diagram so you will know what exactly you need to do at each step.

Tip 2: In-case you are stuck with wondering what operator to use. Use the operator decision tree tool in RxJS. It can be found here.

Subjects

Generally an observable allows only one subscriber to be subscribed to it at any given time. This doesn't bode well for certain use cases where we might need multiple subscribers to be subscribed to the same event for example, Broadcasting data.

Subjects are those special observables that can also act as subscribers meaning they can be used to subscribe to another Observable.

const { Subject } = require('rxjs');

const subject = new Subject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberA);
subject.subscribe(subscriberB);

subject.next('Foo');
subject.next('Bar')
subject.complete();

In the above code the subject acts as an Observable i.e. it only emits values. So the output is similar to how our code worked for synchronized Observable. However we have 2 subscribers A and B attached the subject so both gets invoked every time our subject emits an value.

Subscriber A Emitted: Foo
Subscriber B Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Bar
End of the stream
End of the stream

Let's tweak the above code and see what happens.

const { Subject } = require('rxjs');

const subject = new Subject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

setTimeout(() => {
  subject.subscribe(subscriberA);
}, 2000);

setTimeout(() => {
  subject.subscribe(subscriberB);
}, 4000);

setTimeout(() => {
  subject.next('Foo');
}, 1000);
setTimeout(() => {
  subject.next('Bar')
}, 3000);
setTimeout(() => {
  subject.next('Baz')
}, 5000);

setTimeout(() => {
  subject.complete();
}, 7000);

Now the output becomes.

Subscriber A Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream

Subscriber A caught every value starting from Bar while Subscriber B caught every value starting from Baz but none of them caught the value Foo that's because unlike general observables Subjects are Hot Observables so they don't wait for an subscriber to start emitting values.

Now let's take a look at an example where a subject acts as a subscriber.

const { Subject, Observable } = require('rxjs');

const observable = new Observable((subscriber) => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.next(5);
  subscriber.next(true);
  subscriber.complete();
});

const subject = new Subject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberA);
subject.subscribe(subscriberB);

observable.subscribe(subject);

The output will be

Subscriber A Emitted: Hello
Subscriber B Emitted: Hello
Subscriber A Emitted: World
Subscriber B Emitted: World
Subscriber A Emitted: 5
Subscriber B Emitted: 5
Subscriber A Emitted: true
Subscriber B Emitted: true
End of the stream
End of the stream

In the above example we have created an Observable, a Subject, two subscribers A and B. The the two subscribers A and B subscribe to subject while the subject itself subscribes to observable. So when the observable emits a value it triggers the subject which emits the same value to all the subscribers. This is a way of converting a Cold Observable to a Hot Observable.

Types of Subjects

There are 4 kinds of Subjects in RxJS:

  1. PublishSubject or Subject: It emits only those values to the subscriber which are emitted after the subscriber subscribes to the subject. Whenever we import Subject from RxJS we are actually creating a PublishSubject.
  2. ReplaySubject: It emits all the values that were emitted before the subscriber subscribed to the subject in the original order before emitting the latest value to all subscribers.
  3. BehaviorSubject: When a subscriber subscribes to the subject it emits starting from the latest value that was emitted before the subscription happened.
  4. AsyncSubject: This subject emits only the last value emitted from the source to all subscribers subscribed before the stream was completed.

The marble diagram and more in-depth explanation of these types can be found here.

Replay Subject

const { ReplaySubject } = require('rxjs');

const subject = new ReplaySubject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberA);

subject.next('Foo');
subject.next('Bar');

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberB);

subject.next('Baz');
subject.complete();

Output:

Subscriber A Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Foo
Subscriber B Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream

Since the values Foo and Bar were emitted before SubscriberB subscribed to the subject it emits the previous values to SubscriberB before emitting the next value Baz.

Behavior Subject

const { BehaviorSubject } = require('rxjs');

const subject = new BehaviorSubject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberA);

subject.next('Foo');
subject.next('Bar');

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberB);

subject.next('Baz');
subject.complete();

The output of the above code is

Subscriber A Emitted: undefined
Subscriber A Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream

There are two things to notice in the output. The first line has emitted value as undefined this is because Behavior Subject by default will emit the latest emitted value before the subscriber subscribed to it. In our case the subject didn't emit any value before SubscriberA subscribed to the subject so it emitted undefined when subscriber A subscribed. Meanwhile the value Bar was the last emitted value before the subscriber B subscribed. So when the subscription of B happened the subject emitted the value Bar to B before proceeding to emit other values to all subscribers.

Async Subject

const { AsyncSubject } = require('rxjs');

const subject = new AsyncSubject();

const subscriberA = {
  next: (value) => console.log(`Subscriber A Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberA);

subject.next('Foo');
subject.next('Bar');

const subscriberB = {
  next: (value) => console.log(`Subscriber B Emitted: ${value}`),
  complete: () => console.log('End of the stream'),
};

subject.subscribe(subscriberB);

subject.next('Baz');
subject.complete();

Output:

Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream

Since AsyncSubject emits only the last value the subscribers got invoked for the value Baz and not other values regardless of the time of subscription.

Use Cases

We've gone over the basics of the library let's look at the use cases that can benefit using RxJS.

DOM Events

We can handle various DOM events such as change and click using RxJS to streamline the event handling process. Let's take a look at one such example of processing the input value.

Consider an input element with the id reactive-input in our DOM. We will write a reactive code that will convert the value to upper case every-time a user types in the input field.

const { fromEvent } = rxjs;
const { map } = rxjs.operators;

const eventObservable = fromEvent(document.getElementById('reactive-input'), 'input')
  .pipe(
      map((x) => x.target.value),
      map((x) => x.toUpperCase())
  )

eventObservable.subscribe(x => console.log(x));

In the above code we listen to the input event of reactive-input field and we convert the event to an Observable using fromEvent and we perform map two times. Firstly to extract the value that was typed and second to convert it to upper case. Finally we subscribe to the observable and print the value.

This is a simple use-case however we can orchestrate a whole lot of tasks for example fetching and processing an API output on the click of button. The most famous use case that can be solved easily with RxJS is the notorious Drag-n-Drop.

API Processing

There are a whole bunch of API related use case where using RxJS would result in a lot of benefit.

Response Processing

Most of the times we have to process the response of the API to use the data and often the processing logic can get complex but really can be aggregation of simple tasks. Let's take a look at one simple example.

const { from } = require('rxjs');
const{ filter, flatMap, distinct } = require('rxjs/operators');

const observable = from(fetch('<api>'))
  .pipe(
    flatMap(x => x),
    filter(x => x.tags.length > 3),
    flatMap(x => x.tags),
    distinct(),
  );

const subscriber = {
  next: (x) => { console.log(x); },
  error: (err) => { console.error(err); },
}

observable.subscribe(subscriber);

Our goal is to find the distinct tags for all objects which have more than 3 tags in the response of array of objects each of which contains tags. We first process the response array and split each element into individual elements using the first flatMap. Then we further filter the objects which have more than 3 tags using the filter we again use flatMap to flatten the lists and get the individual tags. Finally we apply distinct to get the distinct tags.

Getting the fastest API result

You would have come across the scenario to hit multiple APIs and process the result of the API to return the data first. We can achieve this using RxJS with just a slight modification from the previous code.

const { race } = require('rxjs');
const{ filter, flatMap, distinct } = require('rxjs/operators');

const observable = race(fetch('<api1>'), fetch('<api2>'))
  .pipe(
    flatMap(x => x),
    filter(x => x.tags.length > 3),
    flatMap(x => x.tags),
    distinct(),
  );

const subscriber = {
  next: (x) => { console.log(x); },
  error: (err) => { console.error(err); },
}

observable.subscribe(subscriber);

We use the race function to create a race between the different objects. The race emits the value of the first object to complete and ignores the result of rest of the objects.

API Retry

Often not we encounter a scenario where we would have to retry an API a certain number of times in-case of an error or because of some event and yes RxJS simplifies this as well for us.

const { from } = require('rxjs');
const{ filter, flatMap, distinct, retry } = require('rxjs/operators');


const observable = from(fetch('<api>'))
  .pipe(
    flatMap(x => x),
    filter(x => x.tags.length > 3),
    flatMap(x => x.tags),
    distinct(),
    retry(5),
  );

const subscriber = {
  next: (x) => { console.log(x); },
  error: (err) => { console.error(err); },
}

observable.subscribe(subscriber);

In the above example we retry five times before giving up and throwing the error using retry operator. So subscriber.error is called only after retrying five times.

Suppose we want to retry another API on failure we can do so using retryWhen operator as follows.

const { from } = require('rxjs');
const{ filter, flatMap, distinct, retryWhen } = require('rxjs/operators');


const observable = from(fetch('<api1>'))
  .pipe(
    flatMap(x => x),
    filter(x => x.tags.length > 3),
    flatMap(x => x.tags),
    distinct(),
    retryWhen((err) => {
      // Possibly can check the type of error before retrying.
      return from(fetch('<api2>'))
    }),
  );

const subscriber = {
  next: (x) => { console.log(x); },
  error: (err) => { console.error(err); },
}

observable.subscribe(subscriber);

Apart from the above mentioned use case we can also use RxJS with React or Vue to manage state and data, stream processing, data broadcasting, creating data streams etc.

Conclusion

RxJS is a very powerful library that provides easy, readable, maintainable solutions to many problems that we are facing currently with Promises and other asynchronous tasks in JavaScript. Many leading companies like Netflix, Airbnb, GitHub etc. all use ReactiveX libraries. Give it a try and play around with it who knows you might just discover a new and an easier solution to one of your use cases.

Posted on May 20 by:

sadarshannaiynar profile

Adarsh

@sadarshannaiynar

A Software Engineer with the mind of a curious kid who likes to explore and deep dive into frameworks, libraries and computers in general.

Discussion

markdown guide