Introduction
One of the effective ways to better know a technology is to get an idea of the underlying implementation and ideally try to build your own.
Our journey throughout this series is to build some of the core RxJS features from scratch, namely:
- The
Observable
class to create and subscribe to anObservable
stream -
Observable
creation utilities likefromEvent
andinterval
- Operators like
take
,map
andswitchMapTo
- The
pipe
utility method which simplifies the composition of several operators applied to anObservable
Consider the code below that starts a new countdown every time the user clicks on the Reset button.
<button id="start">Start</button>
<div id="text"></div>
import { interval, fromEvent } from 'rxjs';
import { switchMapTo, map, take, startWith } from 'rxjs/operators';
const countdownElem = document.getElementById('text');
function countdown(init, delay = 1000) {
return interval(delay).pipe(
take(init),
map(val => init - val - 1),
startWith(init)
);
}
const click$ = fromEvent(document.getElementById('start'), 'click');
const countdownFrom10$ = countdown(10);
const countdownFrom10OnClick$ = click$.pipe(switchMapTo(countdownFrom10$));
const text = document.getElementById('#text');
countdownFrom10OnClick$.subscribe({
next: text => {
countdownElem.innerHTML = `${text}`;
}
});
Here is a preview of the final result.
At the end of the article, all RxJS imports can be replaced by ours for the same result.
import { interval, fromEvent } from "./rxjs-dev";
import { switchMap, map, take } from "./rxjs-dev/operators";
Creating Observables
fromEvent
Let's start with the fromEvent
function.
import { fromEvent } from "rxjs";
const clicks$ = fromEvent(document, "click");
clicks$.subscribe({
next: (event) => console.log(event.clientX, event.clientY),
});
Behind the scenes we can imagine that fromEvent
uses addEventListener
, let's make a first version of it.
function fromEvent(target, eventName) {
return function (listener) {
target.addEventListener(eventName, listener);
};
}
const click$ = fromEvent(document, "click");
click$((event) => console.log(event.clientX, event.clientY));
Notice that fromEvent
does not directly call target.addEventListener
but it returns a function that calls it.
This is one of the key differences with Promises
.
A
Promise
is eager, it is executed immediately, without the need to call thethen
method on it.An
Observable
is lazy, it is constructed and later its logic is executed when we subscribe to it.
Let's adapt our code to get it closer to the fromEvent
api:
function fromEvent(target, eventName) {
return {
subscribe: (observer) => {
target.addEventListener((event) => {
observer.next(event);
});
},
};
}
const click$ = fromEvent(document, "click");
click$.subscribe({
next: (event) => console.log(event.clientX, event.clientY),
});
We have made two updates:
fromEvent
no longer returns a function but an object containing a methodsubscribe
that callstarget.addEventLister
when invoked. This is the beginning of anObservable
.we replaced the
listener
function with an object literal having a next method. This is anObserver
.
Essentially, we've just replaced callback functions with objects that have these specific contracts.
class Observable {
subscribe: (observer: Observer) => {
const data = []; // some logic here
observer.next(data)
};
}
interface Observer {
next(event: any): void;
}
Observable
Now, rather than returning an object literal, we want to create the Observable
instance from the Observable
class we shaped earlier.
function fromEvent(target, eventName): Observable {
// return {
// subscribe(observer: Observer) {
// target.addEventListener(eventName, (event) => {
// observer.next(event);
// });
// },
// };
return new Observable((observer: Observer) => {
target.addEventListener(eventName, (event) => {
observer.next(event);
});
});
}
Notice that the callback function passed to the Observable constructor
is exactly the subscribe
method we put in the object literal, we just need to store it for a later use; when the method subscribe is actually called.
class Observable {
private _subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer: Observer) {
this._subscribe(observer);
}
}
const obs$ = new Observable((observer: Observer) => {
observer.next('some data');
});
const anObserver: Observer = {
next: (value) => console.log(value)
}
obs$.subscribe(anObserver);
So basically, the purpose of an Observable
is to wrap our usual callbacks with specific contracts so that we can compose them and build utilities around them as we will see next.
interval
Let's create the interval utility that creates an Observable
that emits sequential numbers every specified interval of time.
const interval = (period) => Observable {
return new Observable((observer: Observer) => {
let tick = 0;
setInterval((event) => {
observer.next(tick++);
}, period);
});
};
const interval$ = interval(1000);
interval$.subscribe({
next: (tick) => console.log(tick),
});
Pretty straightforward, right?
unsubscribe
Unsubscribing from an observable
means we're no longer interested on its future events. This is how we unsubscribe from an Observable
in RxJS.
const subscription: Subscription = interval$.subscribe({
next: console.log,
});
// Later
subscription.unsubscribe();
Unsubscribing from interval
Observable
means clearing the interval which has been set by setInterval
earlier because we're no longer interested on its data.
const interval = (period) => {
return new Observable((observer) => {
let tick = 0;
const timer = setInterval((event) => {
observer.next(tick++);
}, period);
return () => {
clearInterval(timer);
};
});
};
The teardown function returned on line 8 should be returned to be called using subscription.unsubscribe()
. subscription.unsubscribe
is our teardown function on line 8.
Let's adapt our Observable accordingly:
interface Subscription {
unsubscribe(): void;
}
class Observable {
private _subscribe;
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer: Observer): Subscription {
const tearDownFunction = this._subscribe(observer);
return {
unsubscribe: tearDownFunction
}
}
}
Subscribing similiraly in fromEvent
:
function fromEvent(target, eventName): Observable {
return new Observable((observer: Observer) => {
const listener = observer.next;
target.addEventListener(eventName, listener);
return () => {
target.removeListener(listener);
};
});
}
const subscription: Subscription = fromEvent(document, "click").subscribe({
next: console.log,
});
// Later
subscription.unsubscribe();
Observable contract
There are three types of values an Observable Execution can deliver:
- "Next" sends a value
- "Error" sends an error and stops the observable
- "Complete" does not send a value and stops the observable
interface Observer {
next(data: any): void;
complete(): void;
error(error: any): void;
}
The Observable contract stipulates that whenever a complete or error messages are sent to the Observer
, the Observable stops, wich entails the following:
- The Observable unsubscribe method is called
- All the future calls to the observer methods are ignored
Given the code below:
new Observable((observer: Observer) => {
observer.next("Message 1");
observer.error();
observer.next("Message 2");
observer.complete();
return () => {
console.log("Unsubscribed!");
};
}).subscribe({
next: (value) => console.log(value),
complete: () => console.log("Complete"),
error: () => console.log("Error"),
});
The expected output according to the Observable contract is:
Message 1
Error
Unsubscribed
whereas the current output is:
Message 1
Error
Message 2
Complete
To fix our Observable
, we have to hook into the observer methods and, depending on the state of the Observable, decide whether to call its methods or not and unsubscribe in case of an error or completion.
class Observable {
private _subscribe;
private _unsubscribe;
private _stopped = true;
constructor(subscribe) {
this._subscribe = subscribe;
}
_stop() {
this._stopped = true;
setTimeout(() => {
this._unsubscribe();
});
}
subscribe(observer) {
this._stopped = false;
this._unsubscribe = this._subscribe({
next: (value) => {
if (!this._stopped) {
observer.next(value);
}
},
complete: () => {
if (!this._stopped) {
observer.complete();
this._stop();
}
},
error: () => {
if (!this._stopped) {
observer.error();
this._stop();
}
},
});
return { unsubscribe: this._unsubscribe };
}
}
And that's it!
Summary
We have seen that by passing some functions around we can build a minimalist version of an RxJS Observable. It goes without saying that it is not ready for production. ☠️
Resources
Practice
You might have noticed that the timer does not start right away when you click on the button. To fix that we can replace interval
with timer`.
It is your turn to implement it here.
On the next article we're going to re-implement some of the most used RxJS operators.
If you like the article, let me know, I hardly ever write, it will motivate me to produce more content.
Top comments (0)