DEV Community

Cover image for A Simple Observable Class
Thomas Toledo-Pierre
Thomas Toledo-Pierre

Posted on

A Simple Observable Class

Today, the RxJs library is rather known amongst frontend developers. It is a powerful library, which I enjoy using!

But lately, I wanted to give to some of my students a taste of that library, by making them implement a rather simple Observable class.

What our Observable class should do

  • subscribe: obviously, we'd like our Observable to allow the users to subscribe to its value(s). This method should take in argument a function to execute everytime the value changes. This method will return a Subscription object;
  • next: our Observable will rather be a Subject, as we're going to implement a next method. This next method will allow us to simply update the Observable inner value(s);
  • pipe: we would like our Observable to allow multiple treatments to be done to its value(s), but without directly modifying it. This method should return a new Observable;
  • unsubscribe: this method will actually belong to the Subscription class.

The specs are pretty simple, let's get down to it.

A first implementation

For a start, here is our Observable class declaration

class Observable {
}
Enter fullscreen mode Exit fullscreen mode

Wonderful. Now, our class will contain an inner value. This value will be private. Which means we can do it according to two ways: we either use the #myPrivateField notation -but it is still at stage 3, hopefully stage 4 in 2021-, or declare everything in the constructor.

For a change, I will use the first way.

class Observable {
  #value;
  constructor(value) {
   this.#value = value;
  }
}
Enter fullscreen mode Exit fullscreen mode

So now, we basically have a class with an inner value we cannot access.
We most certainly can move on to the next part of this article.

The subscribe method

We want to register a function that will be executed everytime our Observable's value will change.
In order to do that, we will need a simple array -let's call it subscriptions and make it a private field- and a method that push the function into the array. This method should also execute the function as soon as it is subscribed, passing the value to it.

class Observable {
  #value;
  #subscriptions = [];
  constructor(value) {
   this.#value = value;    
  }

  subscribe(f) {
   this.#subscriptions.push(f);
   f(this.#value);
  }
}
Enter fullscreen mode Exit fullscreen mode

The next method

This method should allow the user to update the inner value of the Observable. It should also trigger all of the subscriptions.

class Observable {
  #value;
  #subscriptions = [];
  constructor(value) {
   this.#value = value;    
  }

  subscribe(f) {
   this.#subscriptions.push(f);
   f(this.#value);
  }

  next(value) {
   this.#value = value;
   this.#subscriptions.forEach((f) => f(this.#value));
  }
}
Enter fullscreen mode Exit fullscreen mode

The pipe method

A pipe should take in parameters an undefined number of functions to execute, and should execute them by passing to the next the result of the previous one.

An implementation, using the Array.prototype.reduce method, could be this:

function pipe(...fList) {
  return (...args) => {
    return fList.slice(1)
          .reduce((f, val) => f(val), fList[0](...args));
  }
}
Enter fullscreen mode Exit fullscreen mode

This implementation actually returns a new pipe.
In our case, it will be a little different, as we already have an initial value, and we do not want to return a new pipe. Also, our pipe method should return a new Observable, containing a new value.

class Observable {
  #value;
  #subscriptions = [];
  constructor(value) {
   this.#value = value;    
  }

  subscribe(f) {
   this.#subscriptions.push(f);
   f(this.#value);
  }

  next(value) {
   this.#value = value;
   this.#subscriptions.forEach((f) => f(this.#value));
  }

  pipe(...fList) {
   const obs = new Observable();
   const res = fList.slice(1)
         .reduce((val, f) => f(val), fList[0](this.#value));
   obs.next(res);
   return obs;
  }
}
Enter fullscreen mode Exit fullscreen mode

The unsubscribe method

As I previously said, the unsubscribe method will belong to a Subscription class.

This class should allow to unregister a function previously registered with the Observable.prototype.subscribe method.

It will need no argument, and will return nothing. If the subscription is already unsubscribed, then it will silently do nothing.

It should work as follow:

const obs = new Observable(12);
// will print 12
const sub = obs.subscribe((val) => console.log(val)); 

// the subscription will print "toto"
obs.next('toto');
sub.unsubscribe();
obs.next('something'); // nothing will happen
Enter fullscreen mode Exit fullscreen mode

The Subscription class should have a function passed as a constructor argument. This function would be its unsubscribe method, and it would be created during the subscribe procedure.

Here is how I am going to do it:

First of all, I am going to change a little bit the way we store our functions in the #subscriptions array of the Observable class: I am going to use a Map.
This Map will pair an ID with a function.

Next, I am going to implement a class Subscription that will only take an unsubscribe function as a constructor parameter. If nothing is supplied, it will simply set a default function that does nothing.

Finally, in the subscribe method of Observable, I will refactor a bit the code and return a new Subscription.

class Observable {
  #value;
  #subscriptions = new Map();
  constructor(value) {
   this.#value = value;    
  }

  subscribe(f) {
   const id = this.#subscriptions.size;
   this.#subscriptions.set(id, f);
   f(this.#value);
   return new Subscription(() => this.#subscriptions.delete(id));
  }

  next(value) {
   this.#value = value;
   this.#subscriptions.forEach((f) => f(this.#value));
  }

  pipe(...fList) {
   const obs = new Observable();
   const res = fList.slice(1).reduce((val, f) => f(val), fList[0](this.#value));
   obs.next(res);
   return obs;
  }
}

class Subscription {
  constructor(unsubscribe = () => void 0) {
   this.unsubscribe = unsubscribe;
  }
}
Enter fullscreen mode Exit fullscreen mode

And voilà!
We now have a very minimalist Observable class with a Subscription mechanism, allowing us to defer treatments.

Be careful using this implementation, as we did not cover the whole range of possible scenarios. For instance, the following code would not work:

const obs = new Observable([
   {name: 'john', age: 28}, 
   {name: 'bastien', age: 24}, 
   {name: 'julia', age: 40}
]);

const sub1 = obs.subscribe(
   ([john, bastien, julia]) => {
      console.log(john, bastien, julia);
   }
);

obs.next(12);
Enter fullscreen mode Exit fullscreen mode

If you have not figured out why this code would throw an exception, try it in your browser console.

We will cover this, and much more, in the upcoming posts. In the meantine, do not hesitate to react in the comments and / or to give me your thoughts about it :)

Bye!

Oldest comments (0)