DEV Community

Cover image for RxJS Schedulers
Eduard Amirbekyan for This is Learning

Posted on

RxJS Schedulers

In this article we will cover RxJS schedulers. Many RxJS users haven't heard of them or don't know about their use cases.

A Scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.
(C) RxJS Documentation

In other words, a scheduler manages the execution order and time of an Observable operation. Let's take a look at an example.

import { of } from  'rxjs';

console.log('Start');
of('Stream').subscribe(console.log);
console.log('End');

/**
 * Logs:
 * Start
 * Stream
 * End
*/
Enter fullscreen mode Exit fullscreen mode

As we can see the stream code executes synchronously. In other words, the stream code executes in synchronous execution context.


How many execution contexts are there?

In browser code executes in following order:

  • The synchronous code is executed first (call stack).
  • Then the microtask queue commands are executed (Promise).
  • After that the macrotask queue commands are executed (setTimeout, setInterval, XMLHttpRequest, ...).
  • In the end there is a queue of calls which are executed right before the next cycle of rerendering (requestAnimationFrame).

For each of the points above there is a scheduler in RxJS:

  • queueScheduler - synchronous
  • asapScheduler - microtask
  • asyncScheduler - macrotask
  • animationFrameScheduler - animation frame

How to schedule?

  • To schedule in what execution context observable values will be emitted use the observeOn operator.
  • To schedule in what execution context the subscribe() call happen use the subscribeOn operator. By default, a subscribe() call on an Observable happens synchronously.

In other words, the observeOn operator plans in what execution context Observable.next(), Observable.error(), Observable.complete() methods will execute, and the subscribeOn operator affects the Subscriber, so the subscribe() call will execute in another context.

We can confirm the execution order of the same code using different schedulers.

import {
  animationFrameScheduler,
  asapScheduler,
  asyncScheduler,
  merge,
  of,
  queueScheduler
} from 'rxjs';
import {observeOn} from 'rxjs/operators';

const queue$ = of('queueScheduler').pipe(observeOn(queueScheduler));
const asap$ = of('asapScheduler').pipe(observeOn(asapScheduler));
const asynch$ = of('asyncScheduler').pipe(observeOn(asyncScheduler));
const animationFrame$ = of('animationFrameScheduler').pipe(
  observeOn(animationFrameScheduler)
);

merge(
  queue$,
  asap$,
  asynch$,
  animationFrame$
).subscribe(console.log);

console.log('synchronous code');

/**
 * Logs:
 * queueScheduler
 * synchronous code
 * asapScheduler
 * asyncScheduler
 * animationFrameScheduler
 */
Enter fullscreen mode Exit fullscreen mode

observeOn and subscribeOn operators take delay as second argument. By default its value is 0.

Note: for any provided non-zero delay and scheduler, asyncScheduler will be used.

Before RxJS version 6.5.0 we could provide a scheduler to several creational operators such as of, from, merge. In newer versions this behavior is deprecated. Now there is a function called scheduled instead.

import {asyncScheduler, of, scheduled} from 'rxjs';

/**
 * Deprecated:
 * of('async', asyncScheduler).subscribe(console.log);
 */

scheduled(of('async'), asyncScheduler).subscribe(console.log);
Enter fullscreen mode Exit fullscreen mode

Scheduler use cases

Cached observables

Suppose we have an Angular service MovieService. It has a method which gets a movie by ID and caches the result. We would implement it as follows:

@Injectable({providedIn: 'root'})
export class MovieService {
  private cache: Map<number, Movie> = new Map<number, Movie>();
  constructor(private readonly http: HttpClient) {}
  // ...
  public getById(id: number): Observable<Movie> {
    if (this.cache.has(id)) {
      return of(this.cache.get(id));
    }
    return this.http.get<Movie>(...).pipe(
      tap((movie: Movie) => this.cache.set(id, movie))
    );
  }
Enter fullscreen mode Exit fullscreen mode

This implementation works, but there is a catch. After calling this method with a specific ID for the first time the result will arrive asynchronously, but after calling it again the result will arrive synchronously. If you are interested why is this bad check out this article:

The issue is easily fixed with asyncScheduler:

@Injectable({providedIn: 'root'})
export class MovieService {
  private cache: Map<number, Movie> = new Map<number, Movie>();
  constructor(private readonly http: HttpClient) {}
  // ...
  public getById(id: number): Observable<Movie> {
    if (this.cache.has(id)) {
      return scheduled(of(this.cache.get(id)), asyncScheduler);
    }
    return this.http.get<Movie>(...);
  }
Enter fullscreen mode Exit fullscreen mode

Mocking services

The same issue will appear when we mock MovieService for tests.
Suppose our test file looks like this:

let movieService: MovieService;

beforeEach(() => {
  TestBed.configureTestingModule({
    providers: [
      {
        provide: MovieService,
        useValue: jasmine.createSpyObj('MovieService', 'getById')
      },
      ...
    ],
    ...
  });
  movieService = TestBed.get(MovieService);
  (movieService.getById as jasmine.Spy).and.returnValue(of(mockMovie));
});
Enter fullscreen mode Exit fullscreen mode

Tests will work, but it's wrong, since we return a synchronous result instead of an asynchronous result which is a realistic scenario. Again, the issue is fixed with asyncScheduler:

let movieService: MovieService;

beforeEach(() => {
  TestBed.configureTestingModule({
    providers: [
      {
        provide: MovieService,
        useValue: jasmine.createSpyObj('MovieService', 'getById')
      },
      ...
    ],
    ...
  });
  movieService = TestBed.get(MovieService);
  (movieService.getById as jasmine.Spy).and.returnValue(
    scheduled(of(mockMovie), asyncScheduler)
  );
});
Enter fullscreen mode Exit fullscreen mode

ExpressionChangedAfterItHasBeenCheckedError

Every Angular developer has encountered this error and found different ways to resolve it.

The following code will result in the error:

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    this.name = 'John';
  }
}
Enter fullscreen mode Exit fullscreen mode

The first solution is updating the name property using setTimeout:

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    setTimeout(() => {
      this.name = 'John';
    })
  }
}
Enter fullscreen mode Exit fullscreen mode

Another solution is running change detection after updating the property:

import {
  AfterViewInit,
  ChangeDetectorRef,
  Component,
} from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  constructor(private readonly cdRef: ChangeDetectorRef) {}

  public name: string;

  ngAfterViewInit() {
    this.name = 'John';
    this.cdRef.detectChanges();
  }
}
Enter fullscreen mode Exit fullscreen mode

Each of RxJS schedulers has schedule method, which runs provided callback function in the scheduler's respective execution context. Using that method also helps to solve the issue:

import { AfterViewInit, Component } from '@angular/core';

@Component({
  selector: 'hello',
  template: `<h1>Hello {{name}}!</h1>`,
  styles: [],
})
export class AppComponent implements AfterViewInit {
  public name: string;

  ngAfterViewInit() {
    /**
     * any of asyncScheduler, asapScheduler and animationFrameScheduler
     * solves the issue
     */
    asyncScheduler.schedule(() => {
      this.name = 'John';
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Oldest comments (0)