DEV Community

Cover image for Observable & RxJS from scratch
Amin
Amin

Posted on • Updated on

Observable & RxJS from scratch

An observable is a data structure that can have multiple values in the future.

Unlike Promises that can only emit one value in the future, observables fill the gap that Promises don't and this is what it makes it an interesting data structure.

However, Observables can be hard to understand at times and this article aims at closing the gap between your practical understanding of observables, especially when used in RxJS.

The goal of this article is to re-create a similar API that RxJS offers to deal with Observables data structures.

Note that RxJS didn't create this concept, as the Observer Pattern is a design pattern that has existed long before the creation of this library and has been used in other situations such as desktop application frameworks like GTK or Qt to name a few.

Observable

In the end, an observable is a data structure that can emit data, and which has subscribers that will be notified whenever a new value is emitted.

const observable = new Observable();

observable.subscribe((newValue) => {
  console.log(`Received value: ${newValue}`);
  // Received value: 1
  // Received value: 2
  // Received value: 3
});

observable.next(1);
observable.next(2);
observable.next(3);
Enter fullscreen mode Exit fullscreen mode

Interesting thing to note, you probably have already dealt with observable values when dealing with the DOM Web API.

const body = document.body

body.addEventListener('click', () => {
  console.log('clicked')
});
Enter fullscreen mode Exit fullscreen mode

In that case, the only thing that we don't have is the call to next, which we can imagine is called internally by the browser whenever it knows that a client clicked a button, then we can react to this event since the browser will emit that event for us subscribers.

Here is an example implementation of what an Observable could look like.

class Observable {
  constructor() {
    this.subscribers = [];
  }

  subscribe(subscriber) {
    this.subscribers.push(subscriber);
  }

  next(newValue) {
    this.subscribers.forEach(subscriber => {
      subscriber(newValue);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Note that in this case, the name Observable in the sense of RxJS is not appropriate, and we are really creating a Subject, but I won't go into much details about RxJS for now as this concept could be named whatever we would like and the only important thing here is the concept.

As you can see, an observable in this implementation can be very easy to understand.

Subscribers are just an array of functions.

When emitting a value, we call all of the subscribers that we know are subscribed to updates and we send them the new value.

That's it. If you reach this part of the article and understand what is going on here, congratulations, you now have a broader understanding of what an Observable is all about.

There is nothing more to it, this is a very easy to understand data structure.

But what makes it powerful is that we can combine multiple observables between them.

Computed values

Computed values are a concept that you probably have already heard if you work with Angular, Vue or React.

This is a value that depends on another one. So one values gets computed whenever another is emitted.

Let's take a look at some example.

const email = new Observable();

const trimmedEmail = new Observable();

trimmedEmail.subscribe(newTrimmedEmail => {
  console.log(newTrimmedEmail);
  // "email@domain.com"
});

email.subscribe(newEmail => {
  trimmedEmail.next(newEmail.trim());
});

email.next("   email@domain.com   ");
Enter fullscreen mode Exit fullscreen mode

Here, we are creating two observables. One for sending email updates, for instance whenever a user is typing in a form field. And another one for sending an email that has its value trimmed, preventing typing an email that has unnecessary spaces.

Here, the only interesting value for us is of course the trimmed email since we probably want to store it instead of the raw email in our database.

For that, we subscribe to the raw email, and each time the raw email is updated (emitted), we send its trimmed version down the pipe using another observable.

This is the trimmed email that we are really interested in the end, so this is this one that we listen and display in the console. The other one is simply used to compute the proper email.

This is what a computed value is all about. In this example, the trimmed email is computed based on the raw email.

This concept easily applies when you take a look at what the Vue 3 Composition API looks like.

const email = ref();
const trimmedEmail = computed(() => email.value.trim());

watchEffect(() => {
  console.log(trimmedEmail.value);
});

email.value = "   email@domain.com   "
Enter fullscreen mode Exit fullscreen mode

Of course, Vue use its own Observable implementation, but we can easily understand this example now that we know that Vue uses observable values in order to track updates and effects.

Operators

Creating computed values for two or three observables is easy to manage, but when you have more and more transformations to apply to your observable data, it can become very confusing to manage all those subscriptions and values altogether.

Hence, the need for a solution to easily transform an observable into another observable and this is where RxJS operators really shine on top of observables.

If we wanted to simplify our previous example, we could try to implement what RxJS does with its operator.

The most popular one is the map operator.

You probably know the Array.prototype.map method which takes a function and applies it to all of the elements of an array.

This is very similar for observables, except the values of an observable are what we are sending when updating it through the next method, and the function registered using map gets applied to all emitted values.

Let's try to implement our own map operator from scratch using what we already have so far.

const map = (update) => {
  return (observable) => {
    const newObservable = new Observable();

    observable.subscribe(newValue => {
      const newUpdatedValue = update(newValue);

      newObservable.next(newUpdatedValue);
    });

    return newObservable;
  };
};
Enter fullscreen mode Exit fullscreen mode

A map is just another subscriber that will emit a new observable. This new observable will compute its value from the listened observable and will apply whatever logic we pass it to the new observable.

As you can see, here we created a function that returns a function, this is very common in functional programming and is called a higher-order function. This just means that this function either accepts a function, and/or returns a function as its return value.

Now, all we need is a way to apply operators to our original observable.

class Observable {
  constructor() {
    this.subscribers = [];
  }

  subscribe(subscriber) {
    this.subscribers.push(subscriber);
  }

  next(newValue) {
    this.subscribers.forEach(subscriber => {
      subscriber(newValue);
    });
  }

  pipe(...operators) {
    return operators.reduce((observable, operator) => {
      return operator(observable);
    }, this);
  }
}
Enter fullscreen mode Exit fullscreen mode

Note that we could have written the pipe method in a more imperative way like so.

  pipe(...operators) {
    let observable = this;

    for (const operator of operators) {
      observable = operator(observable);
    }

    return observable;
  }
Enter fullscreen mode Exit fullscreen mode

We updated our observable a little bit by adding a pipe operator that will take any number of operator, and will return the last observable created by the last operator.

Each operator will listen to the previous, and thus creating a long chain of computed value and subscribers.

If we were to rewrite the email example using this operator, it would look a little bit shorter (but not that much harder to understand).

const email = new Observable();

const trimmedEmail = email.pipe(
  map(email => {
    return email.trim();
  })
);

trimmedEmail.subscribe(newTrimmedEmail => {
  console.log(newTrimmedEmail);
});

email.next("    email@domain.com    ");
Enter fullscreen mode Exit fullscreen mode

And now for a slightly more complex example use case.

const value = new Observable();

const computedValue = value.pipe(
  map(value => value * 2),
  map(value => value + 1)
);

computedValue.subscribe(newValue => {
  console.log(newValue);
  // 3
  // 5
  // 7
});

value.next(1);
value.next(2);
value.next(3);
Enter fullscreen mode Exit fullscreen mode

In this case, we could have written this pipe by hand by calling it differently.

const observable = new Observable();

map(value => value + 1)(map(value => value * 2)(observable))
Enter fullscreen mode Exit fullscreen mode

This is essentially what the reduce does in the pipe method of our Observable class.

As you can see, our computedValue observable is an observable that gets its value from the original one.

Can you guess how many observables are created here? There is in total 3 observables. Why?

Because the first one is our original observable, I think you got this one.

The second is the observable that has been returned by the first call to the map operator. And the third is the second call to the map operator. Leading us to a grand total of 3 observables.

All of these observables and subscriptions is now hidden to us, leaving us less things to worry about so that we can focus on creating application logic and algorithms more easily.

More?

What about a filter operator? Let's try to create one!

const filter = (predicate) => {
  return (observable) => {
    const newObservable = new Observable();

    observable.subscribe(newValue => {
      const itShouldEmit = predicate(newValue);

      if (itShouldEmit) {
        newObservable.next(newValue);
      }
    });

    return newObservable;
  };
};
Enter fullscreen mode Exit fullscreen mode

As you can see here, the logic of this operator is very similar to the map operator. This is because, again, an operator will take whatever it needs and will return a function that will return an observable.

What's really important is what we want to do here.

As for the Array.prototype.filter method, we want to limit the values that are emitted by the observable by applying the logic of a predicate.

A predicate is simply a condition which will be applied to every emitted values of an observable.

So we can take a look at an example to see it in action.

const value = new Observable();

const computedValue = value.pipe(
  map(value => value * 3),
  filter(value => value % 2 === 0)
);

computedValue.subscribe(newValue => {
  console.log(newValue);
  // 6
  // 12
});

value.next(1);
value.next(2);
value.next(3);
value.next(4);
Enter fullscreen mode Exit fullscreen mode

Hmm, weird. We only have two logs in the console...

I though that when an observable emitted a value, it would be received by the subscriber?

This is true, but in this case the subscriber is the one that is being returned by the filter operator, not the original one.

And the filter operator will in fact emit the value only if it checks out the condition. Here the condition is that the emitted value from the map operator should be an even number.

Here is a diagram for a better understanding of what is happening here.

// 1 * 3 = 3  | 3  is not even | 3  won't be emitted (skipped)
// 2 * 3 = 6  | 6  is     even | 6  will  be emitted (kept   )
// 3 * 3 = 9  | 9  is not even | 9  won't be emitted (skipped)
// 4 * 3 = 12 | 12 is     even | 12 will  be emitted (kept   )
Enter fullscreen mode Exit fullscreen mode

From there, if you understood both filter and map, you can start to understand that the only limit is your usecase since we could create an operator for many things like rate limiting, debouncing, merging etc...

Final source-code

If you want a one-file that contains all of the code that was used in this article, here is what it should look like for JavaScript.

class Observable {
  constructor() {
    this.subscribers = [];
  }

  subscribe(subscriber) {
    this.subscribers.push(subscriber);
  }

  next(newValue) {
    this.subscribers.forEach(subscriber => {
      subscriber(newValue);
    });
  }

  pipe(...operators) {
    return operators.reduce((observable, operator) => {
      return operator(observable);
    }, this);
  }
}

const map = (update) => {
  return (observable) => {
    const newObservable = new Observable();

    observable.subscribe(newValue => {
      const newUpdatedValue = update(newValue);

      newObservable.next(newUpdatedValue);
    });

    return newObservable;
  };
};

const filter = (predicate) => {
  return (observable) => {
    const newObservable = new Observable();

    observable.subscribe(newValue => {
      const itShouldEmit = predicate(newValue);

      if (itShouldEmit) {
        newObservable.next(newValue);
      }
    });

    return newObservable;
  };
};
Enter fullscreen mode Exit fullscreen mode

For reference, here is what it could look like if we were to rewrite the source-code using TypeScript (with minor refactoring for TypeScript).

type Subscriber<Type> = (newValue: Type) => void;
type Operator<Type> = (observable: Observable<Type>) => Observable<Type>;

class Observable<Type> {
    private subscribers: Array<Subscriber<Type>>;

    constructor() {
        this.subscribers = [];
    }

    subscribe(subscriber: Subscriber<Type>): void {
        this.subscribers.push(subscriber);
    }

    next(newValue: Type): void {
        this.subscribers.forEach(subscriber => {
            subscriber(newValue);
        });
    }

    pipe(...operators: Array<Operator<Type>>): Observable<Type> {
        const initialObservable: Observable<Type> = this;

        return operators.reduce((observable, operator) => {
            return operator(observable);
        }, initialObservable);
    }
}

type OperatorMapUpdate<Type> = (value: Type) => Type;

const map = <Type>(update: OperatorMapUpdate<Type>) => {
    return (observable: Observable<Type>) => {
        const newObservable = new Observable();

        observable.subscribe(newValue => {
            const newUpdatedValue = update(newValue);

            newObservable.next(newUpdatedValue);
        });

        return newObservable;
    };
};

type OperatorFilterPredicate<Type> = (value: Type) => boolean;

const filter = <Type>(predicate: OperatorFilterPredicate<Type>) => {
    return (observable: Observable<Type>) => {
        const newObservable = new Observable();

        observable.subscribe(newValue => {
            const itShouldEmit = predicate(newValue);

            if (itShouldEmit) {
                newObservable.next(newValue);
            }
        });

        return newObservable;
    };
};
Enter fullscreen mode Exit fullscreen mode

Keep in mind that this is not production-ready code, this is for completeness purposes and to get a bigger picture.

You should stick to RxJS or other alternatives if you trully want to use observables in a production environment.

Closing thoughts

That's it!

You now have a better understanding of what an observable is, and how operators work in RxJS.

Note that Observables and operators from RxJS are two different concepts.

Observables are widely used in programming and they are not tied to RxJS. In fact, the true power of RxJS are not Observables but the operators that allow you to turn any data using a broad range of algorithms.

Also note that this article has been simplified to allow beginners or daily users of RxJS to better understand Observables and operators, but there is way more to it than you think.

One great advantage of using Observables above all composable data structures (Array, Objects, Promises) is that they can represent anything that is immediate or delayed in time: an integer, a boolean, an object, a Promise, a WebSocket, a Stream, an Event Source, a DOM Event, ... You name it.

It allows for having a uniform API through all those data structures.

I hope this article was of any use and if you have questions don't hesitate to ask them below!

Challenge

If you really want to understand how observables and operators work under the hood, you should go and create your own set of observables and operators yourself.

Once this is done, try to create an operator scan.

This operator will take an observable, and will emit the accumulated values that are emitted from that observable, such that if we emit the following values.

observable.next(1);
observable.next(2);
observable.next(3);
observable.next(4);
observable.next(5);
Enter fullscreen mode Exit fullscreen mode

You should get in return the following values when subscribing to it after calling the scan operator.

observable.subscribe(newValue => {
  console.log(newValue);
  // 1
  // 3
  // 6
  // 10
  // 15
});
Enter fullscreen mode Exit fullscreen mode

And here is what the code should look like.

const observable = new Observable();

const newObservable = observable.pipe(
  map(value => value * 2),
  filter(value => value % 2 === 0),
  scan((accumulatedValue, value) => accumulatedValue + value, 0)
);

newObservable.subscribe(newValue => {
  console.log(newValue);
});

Array.from(Array(10)).forEach((_, index) => {
  observable.next(index + 1);
});
Enter fullscreen mode Exit fullscreen mode

Top comments (0)