When working with rxjs, you might find yourself in a situation where you want to integrate a promise in a reactive code base. In order to embrace the full reactivity, it's a good idea to convert that promise into an observable so we can easily pipe other operators or even combine it with other streams.
Previously, rxjs had an operator that was specifically designed for this use-case: fromPromise
. Current versions of rxjs have dropped fromPromise
in favor of from
, however, there's no real difference in usage. The from
operator, apart from arrays and strings, accepts a promise in order to convert it into an Observable.
If you are interested in knowing how it handles a promise or how it defines whether or not it's a promise that's being passed in, have a look at https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/from.ts#L114 and https://github.com/ReactiveX/rxjs/blob/master/src/internal/util/subscribeTo.ts#L20
const url = 'https://jsonplaceholder.typicode.com/todos/1';
function getTodo() {
return fetch(url)
.then(response => response.json());
}
getTodo().then(console.log);
The above code is the promise representation of the snippet that we want to convert to using observables in such a way that we can integrate it with other, existing, observables.
This article is about a function that's returning a Promise that we'll be converting into an Observable, not just a standalone Promise.
Implementing the from operator comes down to wrapping the promise with the from operator and replacing .then(...)
with RXjs' map(...)
:
const url = 'https://jsonplaceholder.typicode.com/todos/1';
function getTodo() {
return from(fetch(url))
.pipe(map(response => response.json()));
}
getTodo().subscribe(console.log);
That should do it, right? We have successfully converted the promise returning function into a function that's returning an Observable. We can now start combining this with other Observables/Operators so that we can create more advanced streams.
But what if I told you this probably isn't what you want (yet)?
Lazy Observables
When using observables, it's not expected that anything happens for as long as there is no active subscription. However, removing the subscription from the above code will still trigger the HTTP call. You can see this in action here: https://stackblitz.com/edit/rxjs-bb626s
If you would inspect the DevTools' network tab, you will notice that the HTTP call is indeed triggered, even tho we do not have any subscription.
We can solve this by either using an existing rxjs operator, in combination with the from
operator we're already using or you can decide to build the observable from scratch.
Using the defer operator
Rxjs' defer operator can be used to wait until an observer subscribes before creating the actual observable.
function getTodo() {
return defer(() => from(fetch(url)));
}
const getTodo$ = getTodo();
setTimeout(() => {
getTodo$.subscribe();
}, 5000);
This will ensure that the HTTP call is only triggered after 5000ms, which is the moment that we're adding a subscription to the observable.
You can see this in action https://stackblitz.com/edit/rxjs-fgwokv
Note: Most of the time, you might be bringing in asynchronous data as a matter of a mergeMap/switchMap/exhaustMap/concatMap operation, which might be returning an Observable that's originating from a Promise in some cases. If that's the case, we, technically, have no need to use defer as the Observable will not be created until the source Observable emits. However, it's still not a bad idea to use defer either way to ensure the Promise is lazy, no matter how it's used.
Building the observable from scratch
Even tho I'd recommend using existing rxjs operators when possible, I think for converting a Promise to an Observable it's worth taking control over the Observable creation ourselves so that we have more control over what happens when we unsubscribe from the Observable (which we will cover in promise cancellation).
function getTodo() {
return new Observable(observer => {
return from(fetch(url)).subscribe(observer);
});
}
const getTodo$ = getTodo();
setTimeout(() => {
getTodo$.subscribe();
}, 5000);
The above code will create an observable based on the promise and only subscribe to it after 5000 ms. If you would have a look at this stackblitz https://stackblitz.com/edit/rxjs-4zj1bx, you will see that the HTTP call is only triggered after 5 seconds. So our observable is now lazy in such a way that it will only resolve the promise (and trigger the HTTP call) when a subscription is added.
Note that we are adding an explicit subscription which we're returning from the Observable's constructor callback as the teardown logic. Doing so ensures that that subscription is cleanup whenever we unsubscribe from the observable returned by
getTodo()
.
Promise cancellation
We're still missing one crucial part in our Promise to Observable conversion. In our case, the promise was representing an HTTP call. Whenever we unsubscribe from the observable before the HTTP call is finished, we probably want to abort the open HTTP request.
function getTodo() {
return new Observable(observer => {
const abortController = new AbortController();
const subscription = from(fetch(url, {
signal: abortController.signal
})).subscribe(observer);
return () => {
abortController.abort();
subscription.unsubscribe();
}
});
}
const getTodo$ = getTodo();
setTimeout(() => {
const sub = getTodo$.subscribe();
sub.unsubscribe();
}, 5000);
AbortController is a built-in interface that allows us to cancel DOM requests, including Promises. Even though a lot of async operations might require a custom AbortController implementation, the fetch API supports the AbortController by default. This means that all we need to do is create an AbortController instance, pass it's signal property to the fetch method and call abort whenever appropriate, in our case meaning in the TearDownLogic, which is called whenever we unsubscribe from the Observable. You can read more about Aborting a fetch on https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API.
Here's a stackblitz containing the functionality to abort the HTTP call: https://stackblitz.com/edit/rxjs-7wc1rb. If you'd inspect the DevTools' network tab, you'll notice an HTPP call is being triggered but it's instantly canceled.
Rxjs fetch operator
Rxjs has built-in support for converting the fetch API to an observable (see: https://github.com/ReactiveX/rxjs/blob/0e4849a36338133ac3c1b890cd68817547177f44/src/internal/observable/dom/fetch.ts
). As you might notice, it is also using the AbortController to cancel the HTTP call when unsubscribed from the Observable (even tho it's slightly more complicated because this article sticks to the basics of promise cancelation). You probably want to be using that instead of hard-crafting your own. However, this article is intended to give you an example on how we can convert any promise to an Observable.
Top comments (1)
So technically we can transform Promise to Observable. But is it possible to receive a stream from such Observable? As for Promise, it's just one fetch. I used from() and it was still one time the data was sent. Am I doing something wrong or is it simply impossible to get stream of data from Promise. Thanks in advance.