DEV Community

Cover image for Learning by Implementing: Observables
Eyas
Eyas

Posted on • Originally published at blog.eyas.sh

Learning by Implementing: Observables

Sometimes, the best way to learn a new concept is to try to implement it. With my journey with reactive programming, my attempts at implementing Observables were key to to my ability to intuit how to best use them. In this post, we’ll be trying various strategies of implementing an Observable and see if we can make get to working solution.

I’ll be using TypeScript and working to implement something similar to RxJS Observables in these examples, but the intuition should be broadly applicable.

First thing’s first, though: what are we trying to implement? My favorite way or motivating Observables is by analogy. If you have some type, T, you might represent it in asynchronous programming as Future<T> or Promise<T>. Just as futures and promises are the asynchronous analog of a plain type, an Observable<T> is the asynchronous construct representing as collection of T.

The basic API for Observable is a subscribe method that takes as bunch of callbacks, each triggering on a certain event:

interface ObservableLike<T> {
  subscribe(
      onNext?: (item: T) => void,
      onError?: (error: unknown) => void,
      onDone?: () => void): Subscription;
}

interface Subscription {
  unsubscribe(): void;
}

With that, let’s get to work!

First Attempt: Mutable Observables

One way of implementing an Observable is to make sure it keeps tracks of it’s subscribers (in an array) and have the object send events to listeners as they happen.

For the purpose of this and other implementations, we’ll define an internal representation of a Subscription as follows:

interface SubscriptionInternal<T> {
  onNext?: (item: T) => void;
  onError?: (error: unknown) => void;
  onDone?: () => void;
}

Therefore, we could define an Observable as such:

(BAD)

class Observable<T> implements ObservableLike<T> {
  private readonly subscribers: Array<SubscriptionInternal<T>> = [];

  triggerNext(item: T) {
    this.subscribers.forEach(sub => sub.onNext && sub.onNext(item));
  }

  triggerError(err: unknown) {
    this.subscribers.forEach(sub => sub.onError && sub.onError(err));
  }

  triggerDone() {
    this.subscribers.forEach(sub => sub.onDone && sub.onDone());
    this.subscribers.splice(0, this.subscribers.length);
  }

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const subInternal: SubscriptionInternal<T> = {
      onNext,
      onError,
      onDone
    };

    this.subscribers.push(subInternal);
    return {
      unsubscribe: () => {
        const index = this.subscribers.indexOf(subInternal);
        if (index !== -1) {
          onDone && onDone(); // Maybe???
          this.subscribers.splice(index, 1);
        }
      }
    };
  }
}

This would be used as follows:

// Someone creates an observable:
const obs = new Observable<number>();
obs.triggerNext(5);
obs.triggerDone();

// Someone uses an observable
obs.subscribe(next => alert(`I got ${next}`), undefined, () => alert("done"));

Problems

There are a few fundamental problems going on here:

  1. The implementer doesn’t know when subscribers will start listening, and thus won’t know if triggering an event will be heard by no one,
  2. Related to the above, this implementation always creates hot observables; the Observable can start triggering events immediately after creation, depending on the creator, and
  3. Mutable: Anyone who receives the Observable can call triggerNext, triggerError, and triggerDone on it, which would interfere with everyone else.

There are some limitations of the current implementation: can error multiple times, a “done” Observable can trigger again, and an Observable can move back and forth between “done”, triggering, and “errored” states. But state tracking here wouldn’t be fundamentally more complicated. We also need to think more about errors in the callback, and what the effect of that should be on other subscribers.

Second Attempt: Hot Immutable Observables

Let’s first solve the mutability problem. One approach is to pass a ReadonlyObservable interface around which hides the mutating methods. But any downstream user up-casting the Observable could wreck havoc, never mind plain JS users who just see these methods on an object.

A cleaner approach in JavaScript is to borrow from the Promise constructor’s executor pattern, where the constructor is must be passed a user-defined function that defines when an Observable triggers:

(Still BAD)

class Observable<T> implements ObservableLike<T> {
  private readonly subscribers: Array<SubscriptionInternal<T>> = [];

    constructor(
      executor: (
        next: (item: T) => void,
        error: (err: unknown) => void,
        done: () => void
      ) => void
    ) {
      const next = (item: T) => {
        this.subscribers.forEach(sub => sub.onNext && sub.onNext(item));
      };

      const error = (err: unknown) => {
        this.subscribers.forEach(sub => sub.onError && sub.onError(err));
      };

      const done = () => {
        this.subscribers.forEach(sub => sub.onDone && sub.onDone());
        this.subscribers.splice(0, this.subscribers.length);
      };

      executor(next, error, done);
    }

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const subInternal: SubscriptionInternal<T> = {
      onNext,
      onError,
      onDone
    };

    this.subscribers.push(subInternal);
    return {
      unsubscribe: () => {
        const index = this.subscribers.indexOf(subInternal);
        if (index !== -1) {
          onDone && onDone(); // Maybe???
          this.subscribers.splice(index, 1);
        }
      }
    };
  }
}

Much better! We can use this as such:

// Someone creates an observable:
const obs = new Observable<number>((next, error, done) => {
  next(5);
  done();
});

// Someone uses an observable
obs.subscribe(next => alert(`I got ${next}`), undefined, () => alert("done"));

This cleans up the API quite a bit. But in this example, calling this code in this order will still cause the subscriber to see no events.

Good Examples

We can already use this type of code to create helpful Observables:

// Create an Observable of a specific event in the DOM.
function fromEvent<K extends keyof HTMLElementEventMap>(
  element: HTMLElement,
  event: K
): Observable<HTMLElementEventMap[K]> {
  return new Observable<HTMLElementEventMap[K]>((next, error, done) => {
    element.addEventListener(event, next);
    // Never Done.
  });
}

const clicks: Observable<MouseEvent> = fromEvent(document.body, "click");

(More examples in original article.)

Even these examples have some issues: they keep running even when no one is listening. That’s sometimes fine, if we know we’ll only have one Observable, or we’re sure callers are listening and so tracking that state is unnecessary overhead, but it’s starting to point to certain smells.

Bad Examples

One common Observable factory is of, which creates an Observable that emits one item. The assumption being that:

const obs: Observable<number> = of(42);
obs.subscribe(next => alert(`The answer is ${next}`));

... would work, and result in “The answer is 42” being alerted. But a naive implementation, such as:

function of<T>(item: T): Observable<T> {
  return new Observable<T>((next, error, done) => {
    next(item);
    done();
  };
}

... would result in the event happening before anyone has the chance to subscribe. Tricks like setTimeout work for code that subscribes immediately after, but is fundamentally broken if we want to generalize this to someone who subscribes at a later point.

The case for Cold Observables

We can try to make our Observables lazy, meaning they only start acting on the world once subscribed to. Note that by lazy I don’t just mean that a shared Observable will only start triggering once someone subscribes to it — I mean something stronger: an Observable will trigger for each subscriber.

For example, we’d like this to work properly:

const obs: Observable<number> = of(42);
obs.subscribe(next => alert(`The answer is ${next}`));
obs.subscribe(next => alert(`The second answer is ${next}`)); 
setTimeout(() => {
  obs.subscribe(next => alert(`The third answer is ${next}`)); 
}, 1000);

Where we get 3 alert messages the contents of the event.

Third Attempt: Cold Observables (v1)

(Still BAD)

type UnsubscribeCallback = (() => void) | void;

class Observable<T> implements ObservableLike<T> {
  constructor(
    private readonly executor: (
      next: (item: T) => void,
      error: (err: unknown) => void,
      done: () => void
    ) => UnsubscribeCallback
  ) {}

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const noop = () => {};
    const unsubscribe = this.executor(
      onNext || noop,
      onError || noop,
      onDone || noop
    );

    return {
      unsubscribe: unsubscribe || noop
    };
  }
}

In this attempt, each Subscription will run the executor separately, triggering onNext, onError, and onDone for each subscriber as needed. This is pretty cool! The naive implementation of of works just fine. I also snuck in a pretty simple method to allow us to add cleanup logic to our executors.

fromEvent would benefit from that, for example:

// Create an Observable of a specific event in the DOM.
function fromEvent<K extends keyof HTMLElementEventMap>(
  element: HTMLElement,
  event: K
): Observable<HTMLElementEventMap[K]> {
  return new Observable<HTMLElementEventMap[K]>((next, error, done) => {
    element.addEventListener(event, next);
    // Never Done.

    return () => {
      element.removeEventListener(event, next);
    };
  });
}

The nice thing about this is that we remove our listeners when a particular subscriber unsubscribes. Except now, we open as many listeners as subscribers. That might be okay for this one case, but we’ll want to figure out how to let users “multicast” (reuse underlying events, etc.) when they want to.

We still haven’t figured out error handling and proper cleanup and error handling. For example:

  1. It is generally regarded that a subscription that errors is closed (just like how throwing an error while iterating over a for loop will terminate that loop)
  2. When a subscriber unsubscribes, we should probably get that “onDone” event.
  3. When there’s an error, we should probably do some cleanup.

Better Cold Observables

Here’s a re-implementation of subscribe that might satisfy these conditions:

class Observable<T> implements ObservableLike<T> {
  constructor(
    private readonly executor: (
      next: (item: T) => void,
      error: (err: unknown) => void,
      done: () => void
    ) => UnsubscribeCallback
  ) {}

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    let dispose: UnsubscribeCallback;
    let running = true;
    const unsubscribe = () => {
      // Do not allow retriggering:
      onNext = onError = undefined;

      onDone && onDone();
      // Don't notify someone of "done" again if we unsubscribe.
      onDone = undefined;

      if (dispose) {
        dispose();
        // Don't dispose twice if we unsubscribe.
        dispose = undefined;
      }

      running = false;
    };

    const error = (err: unknown) => {
      onError && onError(err);
      unsubscribe();
    };

    const done = () => {
      unsubscribe();
    };

    const next = (item: T) => {
      try {
        onNext && onNext(item);
      } catch (e) {
        onError && onError(e);
        error(e);
      }
    };

    dispose = this.executor(next, error, done);

    // We just assigned dispose. If the executor itself already
    // triggered done() or fail(), then unsubscribe() has gotten called
    // before assigning dispose.
    // To guard against those cases, call dispose again in that case.
    if (!running) {
      dispose && dispose();
    }

    return {
      unsubscribe: () => unsubscribe()
    };
  }
}

Using Observables

Taking the “Better Cold Observables” example, let’s see how we can use Observables. These are described in the original article, and they include:

  1. Useful factories, like throwError or zip
  2. Useful operators like map or filter

Conclusion

I didn’t really try to sell you, dear reader, on why you should use Observables as helpful tools in your repertoire. Some of my other writing showing their use cases (here, here, and here) might be helpful. But really, what I wanted to demonstrate is some of the intuition on how Observables work. The implementation I shared isn’t a complete one, for that, you better consult with Observable.ts in the RxJS implementation. This implementation is notably missing a few things:

  • We could still do much better on error handling (especially in my operators)
  • RxJS observables include the pipe() method, which makes applying one or more of those operators to transform an Observable much more ergonomic
  • Lot’s of things here and there

See the original article in my blog

Oldest comments (0)