ReactiveXJS or also known as RxJS is an asynchronous programming API that's built on the principles of Reactive Programming. It does so by borrowing concepts from functional programming and design patterns like the Observer, Publisher-Subscriber, Broadcast etc.
What is Reactive Programming?
Reactive Programming is a paradigm like the Object Oriented Programming that emphasizes on the fact that our code should react whenever there is a change in the input values. This paradigm is useful in handling huge volumes of asynchronous data such as streams, events and propagation of change that happens when an event is triggered.
You might be thinking "Reactive programming sounds a lot like what we already do in when we try to code asynchronous tasks". Yes it does but the code we write to manage the asynchronous calls can get quite messy and error prone as the application scales up. A piece of asynchronous code can quickly turn into a nightmare for developers when it's not handled properly.
Reactive Programming lays out practices and principles that allows developers to manage the asynchronous calls while making the code more readable, pure and less error prone. The reactive programming principle was introduced back in the 1960s and in 2014 the Reactive Manifesto was published which laid out the need and principles of Reactive Programming.
RxJS: An Introduction
RxJS is a JavaScript library that provides APIs to write reactive code in JavaScript.
Mentioned below are the terms that we will be using frequently,
Observable: An entity that our observer/subscriber monitors and reacts to. An example would be a data stream or sequence of events.
Subscriber: An entity that reacts to the values emitted from the oberservable.
Subjects: These are special class of observables that also is an subscriber and allows multiple subscribers to be subscribed to it. It's similar to the broadcast model.
Operators: Functions that allows to create, manage and transform observables. Some examples are map, filter, reduce etc.
Marble Diagrams
To allow us to better visualize the way data flows and changes through the RxJS API we will use "Marble Diagrams".
In the above diagram, right headed arrow represents time duration of the data stream which keeps increasing that is going from past to future. The circles/marbles represent the events that occur at a particular point in time. The vertical bar '|' represents the end of the stream and a successful completion while an 'X' signifies an error which occurred in the stream and an termination of execution.
The block represents the function which is being applied on the stream this function gets invoked whenever there is a new element emitted by the stream. The return type of this function is also a stream so we can chain multiple functions one after the other.
Observable
Observable are objects that allows an subscriber to be subscribed to it and emits values for the subscriber to act. In a reactive programming environment the observable is responsible for pushing changes and data to the subscribers.
There are generally two types of observables:
- Hot: Starts emitting values as soon as it is created
- Cold: Starts emitting values only after an subscriber is subscribed to it.
Let's take a look the following code
const { Observable } = require('rxjs');
const subscribe = (subscriber) => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.next(5);
subscriber.next(true);
subscriber.complete();
};
const observable = new Observable(subscribe);
const subscriber = {
next: (value) => console.log(`Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
observable.subscribe(subscriber);
console.log('Subscribed');
The above code produces the following output:
Emitted: Hello
Emitted: World
Emitted: 5
Emitted: true
End of the stream
Subscribed
First we create an Observable by using the Observable
constructor which takes a subscribe
function as the parameter. This subscribe
function itself has a parameter called subscriber
. The subscriber comprises of 3 methods next
, complete
and error
. The next
method is used to emit the value to the subscriber, complete
function is used to signal that we have reached the end and error
function is used to process any error that is thrown by the observable.
If you notice we emit values of multiple data types. In the above instance the first two values we emit are strings, the third value is a number and the final value is a boolean. So an observable is not bound to any certain data type.
We then created a subscriber
object which subscribes to the Observable
instance to process the emitted values. The subscribe
methods of an Observable
instance takes an object as a parameter. The object again consists of three key, value pairs next
, complete
and error
. The values for the keys are functions.
The functions subscriber.next()
and subscriber.complete()
in the observable instance merely executes the code we have written for the next
and completed
keys in our subscriber
object.
Now let's simulate an error in the stream.
const { Observable } = require('rxjs');
const subscribe = (subscriber) => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.error(new Error('Some error in stream'));
subscriber.next(5);
subscriber.next(true);
};
const observable = new Observable(subscribe);
const subscriber = {
next: (value) => console.log(`Emitted: ${value}`),
complete: () => console.log('End of the stream'),
error: (err) => console.error(`${err}`),
};
observable.subscribe(subscriber);
console.log('Subscribed');
The above code will produce the following output:
Emitted: Hello
Emitted: World
Error: Some error in stream
Subscribed
Note that both 5
and true
are missing from the output this is because once the subscriber encounters an error
or complete
signal it will stop processing the stream.
Whatever we wrote above is actually synchronous and not asynchronous. Let's modify it a little bit to make it asynchronous.
const { Observable } = require('rxjs');
const subscribe = (subscriber) => {
setTimeout(() => subscriber.next('Hello'), 1000);
setTimeout(() => subscriber.next('World'), 3000);
setTimeout(() => subscriber.next(5), 5000);
setTimeout(() => subscriber.next(true), 7000);
setTimeout(() => subscriber.complete(), 9000);
};
const observable = new Observable(subscribe);
const subscriber = {
next: (value) => console.log(`Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
observable.subscribe(subscriber);
console.log('Subscribed');
Now we will get the following output with an interval of 2 seconds before printing each line after the first line.
Subscribed
Emitted: Hello
Emitted: World
Emitted: 5
Emitted: true
End of the stream
If you notice the 'Subscribed'
gets printed on to the console first because in our observable we emit values after a certain duration. Every time the subscriber.next()
is called in the setTimeout
it triggers the next
function of our subscriber and prints the value finally we trigger subscriber.complete()
.
Last but not least we can create an observable in many ways, more details can be found here.
Congratulations, we have written our first code in RxJS. Let's go ahead the explore other features of the library.
RxJS Operators
Operators in RxJS are pure functions that transforms, process and manages the data that it receives from the observable. For example, map
, filter
, forEach
etc. are some operators.
const { from } = require('rxjs');
const { map, take, filter } = require('rxjs/operators');
function* generateUserEverySecond() {
previous = Date.now()
while (true) {
if (Date.now() - previous >= 1000) {
previous = Date.now();
yield {
createdDate: Date.now(),
updateDate: Date.now(),
userId: Math.floor(Math.random() * 100000),
numberOfLikes: Math.floor(Math.random() * 100),
};
}
}
}
const observable = from(generateUserEverySecond())
.pipe(
map((x) => ({ user: x.userId, likes: x.numberOfLikes })),
filter((x) => x.likes > 80),
take(5)
);
const subscriber = {
next: (value) => console.log(`Emitted: ${JSON.stringify(value)}`),
complete: () => console.log('End of the stream'),
};
observable.subscribe(subscriber);
Output:
Emitted: {"user":48219,"likes":93}
Emitted: {"user":7996,"likes":90}
Emitted: {"user":39907,"likes":82}
Emitted: {"user":53731,"likes":96}
Emitted: {"user":53499,"likes":84}
End of the stream
The from
function is used to convert an Promise, Iterable or an Array into an Observable. The generateUserEverySecond
generator will yield an object every second.
We have used the pipe
function to apply the operators and each of the operators will be called whenever it encounters a new data. The pipe
function kind of acts an pipeline and each operator can be thought of a gates in the pipeline that control how the data flows and changes across the pipeline.
We have applied three operators on our data map
, filter
and take
function. The map
functions transforms the input object into another object and produces it as the input to the next operator. The filter
function picks only those objects which have likes
greater than 80. Finally the take
function tells the observable to signal complete
after we obtain 5 objects as the output meaning our stream will end after the filter
operator produces 5 objects.
Let's visualize our code with a marble diagram to see the data flow and changes.
Tip 1: Before writing a reactive code try to chart out a marble diagram so you will know what exactly you need to do at each step.
Tip 2: In-case you are stuck with wondering what operator to use. Use the operator decision tree tool in RxJS. It can be found here.
Subjects
Generally an observable allows only one subscriber to be subscribed to it at any given time. This doesn't bode well for certain use cases where we might need multiple subscribers to be subscribed to the same event for example, Broadcasting data.
Subjects are those special observables that can also act as subscribers meaning they can be used to subscribe to another Observable.
const { Subject } = require('rxjs');
const subject = new Subject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberA);
subject.subscribe(subscriberB);
subject.next('Foo');
subject.next('Bar')
subject.complete();
In the above code the subject
acts as an Observable i.e. it only emits values. So the output is similar to how our code worked for synchronized Observable. However we have 2 subscribers A and B attached the subject so both gets invoked every time our subject emits an value.
Subscriber A Emitted: Foo
Subscriber B Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Bar
End of the stream
End of the stream
Let's tweak the above code and see what happens.
const { Subject } = require('rxjs');
const subject = new Subject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
setTimeout(() => {
subject.subscribe(subscriberA);
}, 2000);
setTimeout(() => {
subject.subscribe(subscriberB);
}, 4000);
setTimeout(() => {
subject.next('Foo');
}, 1000);
setTimeout(() => {
subject.next('Bar')
}, 3000);
setTimeout(() => {
subject.next('Baz')
}, 5000);
setTimeout(() => {
subject.complete();
}, 7000);
Now the output becomes.
Subscriber A Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream
Subscriber A caught every value starting from Bar
while Subscriber B caught every value starting from Baz
but none of them caught the value Foo
that's because unlike general observables Subjects are Hot Observables so they don't wait for an subscriber to start emitting values.
Now let's take a look at an example where a subject acts as a subscriber.
const { Subject, Observable } = require('rxjs');
const observable = new Observable((subscriber) => {
subscriber.next('Hello');
subscriber.next('World');
subscriber.next(5);
subscriber.next(true);
subscriber.complete();
});
const subject = new Subject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberA);
subject.subscribe(subscriberB);
observable.subscribe(subject);
The output will be
Subscriber A Emitted: Hello
Subscriber B Emitted: Hello
Subscriber A Emitted: World
Subscriber B Emitted: World
Subscriber A Emitted: 5
Subscriber B Emitted: 5
Subscriber A Emitted: true
Subscriber B Emitted: true
End of the stream
End of the stream
In the above example we have created an Observable, a Subject, two subscribers A and B. The the two subscribers A and B subscribe to subject
while the subject itself subscribes to observable
. So when the observable emits a value it triggers the subject which emits the same value to all the subscribers. This is a way of converting a Cold Observable to a Hot Observable.
Types of Subjects
There are 4 kinds of Subjects in RxJS:
- PublishSubject or Subject: It emits only those values to the subscriber which are emitted after the subscriber subscribes to the subject. Whenever we import
Subject
from RxJS we are actually creating a PublishSubject. - ReplaySubject: It emits all the values that were emitted before the subscriber subscribed to the subject in the original order before emitting the latest value to all subscribers.
- BehaviorSubject: When a subscriber subscribes to the subject it emits starting from the latest value that was emitted before the subscription happened.
- AsyncSubject: This subject emits only the last value emitted from the source to all subscribers subscribed before the stream was completed.
The marble diagram and more in-depth explanation of these types can be found here.
Replay Subject
const { ReplaySubject } = require('rxjs');
const subject = new ReplaySubject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberA);
subject.next('Foo');
subject.next('Bar');
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberB);
subject.next('Baz');
subject.complete();
Output:
Subscriber A Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Foo
Subscriber B Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream
Since the values Foo
and Bar
were emitted before SubscriberB
subscribed to the subject it emits the previous values to SubscriberB
before emitting the next value Baz
.
Behavior Subject
const { BehaviorSubject } = require('rxjs');
const subject = new BehaviorSubject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberA);
subject.next('Foo');
subject.next('Bar');
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberB);
subject.next('Baz');
subject.complete();
The output of the above code is
Subscriber A Emitted: undefined
Subscriber A Emitted: Foo
Subscriber A Emitted: Bar
Subscriber B Emitted: Bar
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream
There are two things to notice in the output. The first line has emitted value as undefined
this is because Behavior Subject by default will emit the latest emitted value before the subscriber subscribed to it. In our case the subject didn't emit any value before SubscriberA
subscribed to the subject so it emitted undefined
when subscriber A subscribed. Meanwhile the value Bar
was the last emitted value before the subscriber B subscribed. So when the subscription of B happened the subject emitted the value Bar
to B before proceeding to emit other values to all subscribers.
Async Subject
const { AsyncSubject } = require('rxjs');
const subject = new AsyncSubject();
const subscriberA = {
next: (value) => console.log(`Subscriber A Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberA);
subject.next('Foo');
subject.next('Bar');
const subscriberB = {
next: (value) => console.log(`Subscriber B Emitted: ${value}`),
complete: () => console.log('End of the stream'),
};
subject.subscribe(subscriberB);
subject.next('Baz');
subject.complete();
Output:
Subscriber A Emitted: Baz
Subscriber B Emitted: Baz
End of the stream
End of the stream
Since AsyncSubject emits only the last value the subscribers got invoked for the value Baz
and not other values regardless of the time of subscription.
Use Cases
We've gone over the basics of the library let's look at the use cases that can benefit using RxJS.
DOM Events
We can handle various DOM events such as change
and click
using RxJS to streamline the event handling process. Let's take a look at one such example of processing the input value.
Consider an input
element with the id reactive-input
in our DOM. We will write a reactive code that will convert the value to upper case every-time a user types in the input field.
const { fromEvent } = rxjs;
const { map } = rxjs.operators;
const eventObservable = fromEvent(document.getElementById('reactive-input'), 'input')
.pipe(
map((x) => x.target.value),
map((x) => x.toUpperCase())
)
eventObservable.subscribe(x => console.log(x));
In the above code we listen to the input
event of reactive-input
field and we convert the event to an Observable using fromEvent
and we perform map two times. Firstly to extract the value that was typed and second to convert it to upper case. Finally we subscribe to the observable and print the value.
This is a simple use-case however we can orchestrate a whole lot of tasks for example fetching and processing an API output on the click of button. The most famous use case that can be solved easily with RxJS is the notorious Drag-n-Drop.
API Processing
There are a whole bunch of API related use case where using RxJS would result in a lot of benefit.
Response Processing
Most of the times we have to process the response of the API to use the data and often the processing logic can get complex but really can be aggregation of simple tasks. Let's take a look at one simple example.
const { from } = require('rxjs');
const{ filter, flatMap, distinct } = require('rxjs/operators');
const observable = from(fetch('<api>'))
.pipe(
flatMap(x => x),
filter(x => x.tags.length > 3),
flatMap(x => x.tags),
distinct(),
);
const subscriber = {
next: (x) => { console.log(x); },
error: (err) => { console.error(err); },
}
observable.subscribe(subscriber);
Our goal is to find the distinct tags
for all objects which have more than 3 tags in the response of array of objects each of which contains tags
. We first process the response array and split each element into individual elements using the first flatMap
. Then we further filter the objects which have more than 3 tags using the filter
we again use flatMap
to flatten the lists and get the individual tags. Finally we apply distinct
to get the distinct tags.
Getting the fastest API result
You would have come across the scenario to hit multiple APIs and process the result of the API to return the data first. We can achieve this using RxJS with just a slight modification from the previous code.
const { race } = require('rxjs');
const{ filter, flatMap, distinct } = require('rxjs/operators');
const observable = race(fetch('<api1>'), fetch('<api2>'))
.pipe(
flatMap(x => x),
filter(x => x.tags.length > 3),
flatMap(x => x.tags),
distinct(),
);
const subscriber = {
next: (x) => { console.log(x); },
error: (err) => { console.error(err); },
}
observable.subscribe(subscriber);
We use the race
function to create a race between the different objects. The race
emits the value of the first object to complete and ignores the result of rest of the objects.
API Retry
Often not we encounter a scenario where we would have to retry an API a certain number of times in-case of an error or because of some event and yes RxJS simplifies this as well for us.
const { from } = require('rxjs');
const{ filter, flatMap, distinct, retry } = require('rxjs/operators');
const observable = from(fetch('<api>'))
.pipe(
flatMap(x => x),
filter(x => x.tags.length > 3),
flatMap(x => x.tags),
distinct(),
retry(5),
);
const subscriber = {
next: (x) => { console.log(x); },
error: (err) => { console.error(err); },
}
observable.subscribe(subscriber);
In the above example we retry five times before giving up and throwing the error using retry
operator. So subscriber.error
is called only after retrying five times.
Suppose we want to retry another API on failure we can do so using retryWhen
operator as follows.
const { from } = require('rxjs');
const{ filter, flatMap, distinct, retryWhen } = require('rxjs/operators');
const observable = from(fetch('<api1>'))
.pipe(
flatMap(x => x),
filter(x => x.tags.length > 3),
flatMap(x => x.tags),
distinct(),
retryWhen((err) => {
// Possibly can check the type of error before retrying.
return from(fetch('<api2>'))
}),
);
const subscriber = {
next: (x) => { console.log(x); },
error: (err) => { console.error(err); },
}
observable.subscribe(subscriber);
Apart from the above mentioned use case we can also use RxJS with React or Vue to manage state and data, stream processing, data broadcasting, creating data streams etc.
Conclusion
RxJS is a very powerful library that provides easy, readable, maintainable solutions to many problems that we are facing currently with Promises and other asynchronous tasks in JavaScript. Many leading companies like Netflix, Airbnb, GitHub etc. all use ReactiveX libraries. Give it a try and play around with it who knows you might just discover a new and an easier solution to one of your use cases.
Top comments (0)