Reactive programming is an essential part of modern web applications. However, few popular programming languages come equipped with the reactive API by default. RxJS allows you to create reactive programs with JavaScript to better serve your users. RxJS is a library used to create asynchronous programs using observable sequences.
Today, we'll explore an overview of reactive programming and RxJS and walk you through a quick tutorial on how to implement all the fundamental components of RxJS in your apps.
Today, we'll learn:
- What is reactive programming?
- What is RxJS?
- RxJS observables
- RxJS data pipeline
- RxJS creational operators
- RxJS pipe function and pipable operators
- RxJS filtering operators
- What to learn next
Get a quick start with RxJS
Skip scrubbing through videos. Learn RxJS and reactive programming through interactive, text-based lessons.
Building Reactive Applications with RxJS
What is reactive programming?
Almost every online application today generates large amounts of real-time, interactive data. Applications are expected to make changes across the app in response to events and remain fully functional during the process. The reactive paradigm was made to handle these "events" with real-time updates across the program.
Reactive programs are structured around events rather than sequential top-down execution of iterative code. This allows them to respond to a trigger event regardless of when what stage the program is on.
Reactive programming is often combined with functional programming and concurrency to create stable, scalable, and event-driven programs.
Asynchronous vs Synchronous
One of the main concepts in reactive programming is synchronous vs asynchronous data. In short, synchronous data is delivered one at a time, as soon as possible.
Asynchronous data waits for a set event and is then delivered all at once through a "callback". Asynchronous data is more popular in reactive programming because it fits well with the paradigm's event-based approach.
Advantages of reactive programming
The main advantage of reactive programming is that it allows a program to remain responsive to events regardless of the program's current task.
Other advantages include:
- Highly scalable
- Clean and readable
- Easy to add new event or response support
- Improved user experience due to little downtime
The reactive paradigm can also be combined with other paradigms to form a blend, such as object-oriented reactive programming (OORP) or functional reactive programming (FRP). This blendable quality makes reactive programming a versatile paradigm that can be altered to fit a variety of purposes.
What is RxJS?
The reactive paradigm is available for many languages through reactive extensions, or Rx-libraries. These libraries are downloadable APIs that add support for essential reactive tools like observers and reactive operators. With reactive extensions, developers can convert normally iterative languages like JavaScript, Python, C++, etc, to reactive languages.
RxJS is more specifically a functional reactive programming tool featuring the observer pattern and the iterator pattern. It also includes an adapted form of the JavaScript's array functions (reduce, map, etc.) to handle asynchronous events as collections.
JavaScript's Rx-library is called RxJS. RxJS has become very popular because it simplifies JavaScripts async implementation. Without extensions, JavaScript's async is difficult to use and underdeveloped. RxJS makes async more achievable with tools built specifically for reactive and asynchronous programming.
Many web application frameworks, like Angular, are based on RxJS structures. As a result, you may have indirectly used RxJS already!
Next, we'll break down the core components of RxJS and show you how to implement them in your own code.
RxJS Observables
Observables are parts of our program that generate data over time. An observable's data is a stream of values that can then be transmitted synchronously or asynchronously.
Consumers can then subscribe to observables to listen to all the data they transmit. Consumers can be subscribed to multiple observables at the same time. This data can then be transformed as it moves through the data pipeline toward the user.
Let's see how to create an observable!
const {Observable} = require('rxjs')
const wrapArrayIntoObservable = arr => {
return new Observable(subscriber => {
for(let item of arr) {
subscriber.next(item);
}
});
}
const data = [1, 2, 3, 4, 5];
const observable = wrapArrayIntoObservable(data);
observable.subscribe(val => console.log('Subscriber 1: ' + val));
observable.subscribe(val => console.log('Subscriber 2: ' + val));
// Output:
Subscriber 1: 1
Subscriber 1: 2
Subscriber 1: 3
Subscriber 1: 4
Subscriber 1: 5
Subscriber 2: 1
Subscriber 2: 2
Subscriber 2: 3
Subscriber 2: 4
Subscriber 2: 5
On line 3, we create the wrapArrayIntoObservable()
function that takes an array as a parameter and wraps that array into an observable
. This function is then passed to the Observable
constructor on line 12 and run for each subscriber. Finally, on lines 14 and 15 each subscriber prints the data stream received.
RxJS data pipeline
Data pipelines are a sequential series of transformations all data in a stream passes through before it is presented to the user. These transformations can be applied to all data that passes through, for example, to make the stream more readable to the user.
There can also be optional transformations that only occur under specific events, like filtering transformations. Data still passes through these optional transformations but are simply not applied.
Each transformation can be thought of as a pipe segment that the data flow passes through.
Let's see how we can create a data pipeline that can deliver a raw output and an optional user-friendly output for multiple subscribers:
const { from } = require('rxjs');
const { tap, filter, map } = require('rxjs/operators');
const arrayDataObservable$ = from([1, 2, 3, 4, 5]);
const dataPipeline = arrayDataObservable$.pipe(
tap(val => console.log('Value passing through the stream: ' + val)),
filter(val => val > 2),
map(val => val * 2)
)
const subscribeToBaseObservable = subscriberName => {
return arrayDataObservable$.subscribe(val => {
console.log(subscriberName + ' received: ' + val);
})
}
const subscribeToDataPipeline = subscriberName => {
return dataPipeline.subscribe(val => {
console.log(subscriberName + ' received: ' + val);
})
}
const handleSubscriptionToBaseObservable = () => {
const subscription1 = subscribeToBaseObservable('Subscriber1');
const subscription2 = subscribeToBaseObservable('Subscriber2');
}
const handleSubscriptionToDataPipeline = () => {
const subscription1 = subscribeToDataPipeline('Subscriber1');
const subscription2 = subscribeToDataPipeline('Subscriber2');
}
// 1. Execute this function first
handleSubscriptionToBaseObservable();
// 2. Execute this function next
//handleSubscriptionToDataPipeline();
//raw output
Subscriber1 received: 1
Subscriber1 received: 2
Subscriber1 received: 3
Subscriber1 received: 4
Subscriber1 received: 5
Subscriber2 received: 1
Subscriber2 received: 2
Subscriber2 received: 3
Subscriber2 received: 4
Subscriber2 received: 5
//filtered output
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber1 received: 6
Value passing through the stream: 4
Subscriber1 received: 8
Value passing through the stream: 5
Subscriber1 received: 10
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber2 received: 6
Value passing through the stream: 4
Subscriber2 received: 8
Value passing through the stream: 5
Subscriber2 received: 10
Through executing the two different functions, you can see how pipelines can be used to deliver the same data in different ways based on the user's subscriptions. Users are also notified through tap
that the data has been transformed.
RxJS creational operators
The most common operators used in RxJS data pipelines are creational operators. We'll cover the simple from
creational operator used in the previous section and the closely related of
operator.
from
The from
operator is used to wrap an array, a promise, or an iterable into an Observable
. This operator directs the program to an already created data collection, like an array, that is then used to populate the observable values.
Here's an example:
const { from } = require('rxjs');
const DATA_SOURCE = [ 'String 1', 'String 2', 'Yet another string', 'I am the last string' ];
const observable$ = from(DATA_SOURCE)
observable$.subscribe(console.log)
// output
String 1
String 2
Yet another string
I am the last string
of
The of
operator is the second most common creational operator. The of
operator is syntactically similar to from
but of
accepts sequential data rather than iterative data like arrays. If it does get an array, of
simply prints the array like a declarative statement. When wrapping observables, of
is best used if the data does make sense in an array.
const { of } = require('rxjs');
const DATA_SOURCE = [ 'String 1', 'String 2', 'Yet another string', 'I am the last string' ];
const observableArray$ = of(DATA_SOURCE)
console.log("Array data source")
observableArray$.subscribe(console.log)
console.log("\n")
console.log("Sequence data source")
const observableSequence$ = of('String 1', 'String 2', 'Yet another string', 'I am the last string')
observableSequence$.subscribe(console.log)
//output
Array data source
[ 'String 1',
'String 2',
'Yet another string',
'I am the last string' ]
Sequence data source
String 1
String 2
Yet another string
I am the last string
Keep learning about RxJS
Learn to build reactive JavaScript apps in just a few hours. Educative's courses teach you the skills you'll need to succeed through interactive code examples and projects.
Building Reactive Applications with RxJS
RxJS pipe function and pipeable operators
The pipe()
function calls all operators other than creational operators. These non-creational operators are the second type of operator, called pipeable operators.
Pipeable operators take one observable as input and returns an observable as output to continue the pipeline. They can be called like a normal function, op1()(obs)
, but are more often called in sequence to form a data pipeline. The pipe()
function is a cleaner way to call multiple operators in sequence and is therefore the preferred way to call operators.
// standard
op4()(op3()(op2()(op1()(obs))))
// pipe function
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
It is considered best practice to use the
pipe()
function even if you only call one operator.
RxJS filtering operators
The most common type of pipeable operator is the filtering operator. These operators remove all values that do not fit their passed criteria. We'll look at the popular filter
and first
filtering operators.
filter
The filter
operator takes a predicate function, like val => val + 1 == 3
, that is applied to all passed values. For each value, the program compares the given value to the predicate function and keeps any values that make the function true
.
The example below will only allow even numbers through:
const { from } = require('rxjs');
const { filter } = require('rxjs/operators');
const observable$ = from([1, 2, 3, 4, 5, 6])
observable$.pipe(
filter(val => val % 2 == 0)
).subscribe(console.log)
//output
2
4
6
The filter
operator is a great tool to transform data to suit the needs of specific subscribers. For example, some users may want to see all product listings while others may only want to see products from a certain price range.
first
The first
operator can be used in two ways. By default, it returns the first value emitted by the observable. The benefit of returning the first value is that the turnaround time is very low, making this use great for times when a simple, quick response is sufficient.
const { from } = require('rxjs');
const { first } = require('rxjs/operators');
const observable$ = from([1, 2, 3, 4, 5, 6])
// take first
observable$.pipe(
first()
).subscribe(console.log)
// output
1
The other use of the first
operator adds a predicate function or default value to compare against passed values. Similar to filter
, first
then returns the first value to match the predicate. This use helps you search a data stream when you only need one value.
const { from } = require('rxjs');
const { first } = require('rxjs/operators');
const observable$ = from([1, 2, 3, 4, 5, 6])
// Example 1 - take first that passes the predicate, or default otherwise
observable$.pipe(
first(val => val > 6, -1)
).subscribe(console.log)
//output
-1
Filter operations are pure operations, meaning that the original observable data is not affected by any transformations to the emitted data stream.
What to learn next
Congratulations on completing our quick tutorial on RxJS basics. You now have the tools to create observables, use common creational and pipeable operators, string them all together with the pipe()
function.
But, this is only a snapshot of what RxJS can offer you. Some intermediate topics you can tackle are:
- Subjects
- Transformational and Combinational Operators
- Custom Operators
- DOM event integration
- Reactive error handling
To help you pick up these advanced topics, Educative has developed Building Reactive Applications with RxJS. This course is full of interactive code blocks and full coding projects to help give you the hands-on experience you'll need to master RxJS. By the end, you'll have extensive reactive programming skills to develop your very own web applications!
Top comments (1)
Great article and here's why we should use time operators like throttleTime and debounceTime in rxjs , with example