DEV Community

loading...

Homemade observables. Part 2: Composition

vonheikemen profile image Heiker Updated on ・6 min read

In the last post we learned about Observables, what are they and how we could build one from scratch. Now we will learn how we can manipulate existing Observables to extend their behavior.

This time we'll create some utility functions, and tweak a little bit our current Observable implementation, in order to create more flexible features with them.

It all starts with operators

Operators are functions that allow us to extend the behavior of an observable with a chain of functions. Each of this functions can take an observable as a data source and returns a new observable.

Lets keep the array theme in here and create a map operator that emulates the native map function of the Array prototype, but for observables. Our operator will do this: take a value, apply a function that will perform some transformation and return a new value.

Lets give it a try:

First step, get the transform function and the data source, then return a new observable that we can use.

function map(transformFn, source$) {
  return Observable(function(observer) {
    // to be continued...
  });
}

Here comes the cool part, the source that we get is an observable and that means we can subscribe to it to get some values.

function map(transformFn, source$) {
  return Observable(function(observer) {
    // remember to keep returning values from your functions.
    // This will return the unsubcribe function
    return source$.subscribe(function(value) {
      // to be continued...
    });
  });
}

Now we need to pass the result of the transformation to the observer so we can "see" it when we subscribe to this new observable.

function map(transformFn, source$) {
  return Observable(function(observer) {
    return source$.subscribe(function(value) {
      // ****** WE ARE HERE ******
      var newValue = transformFn(value);
      observer.next(newValue);
      // *************************
    });
  });
}

There is a lot of indentation and returns going on in here. We can "fix" that if we use arrow functions all the way.

function map(transformFn, source$) {
  return Observable(observer => 
    source$.subscribe(value => observer.next(
      transformFn(value)
    ))
  );
}

// that didn't do much for the indentation. 
// Well, you can't win them all.

We still need to use the operator and right now this will be it.

function fromArray(arr) {
  return Observable(function(observer) {
    arr.forEach(value => observer.next(value));
    observer.complete();
  });
}

var thisArray = [1, 2, 3, 4];
var plusOne   = num => num + 1;
var array$    = map(plusOne, fromArray(thisArray));

array$.subscribe(value => console.log(value));

This doesn't feel very chainy. In order to use more of this map functions we would have to nest them, and that ain't right. Don't worry, we'll get to that in a moment.

Pipe all the things

We will create a helper function that will allow us to use one or more operators that can modify an observable source.

This function will take a collection of functions, and each function in the collection will use the return value of the previous function as an input.

First, I'm going to show how this could be done as a standalone helper function.

function pipe(aFunctionArray, initialSource) {
  var reducerFn = function(source, fn) {
    var result = fn(source);
    return result;
  };

  var finalResult = aFunctionArray.reduce(reducerFn, initialSource);

  return finalResult;
}

In here the reduce function loops over the array and for each element in it executes reducerFn. Inside reducerFn in the first loop, source will be initialSource and in the rest of the loops source will be whatever you return from reducerFn. The finalResult is just the last result returned from reducerFn.

With some modifications (ES6+ goodness included) we can use this helper function within our Observable factory to make it more flexible. Our new factory would now look like this:

function Observable (subscriber) {
  var observable = {
    subscribe: observer => subscriber(SafeObserver(observer)),
    pipe: function (...fns) {
      return fns.reduce((source, fn) => fn(source), observable);
    }
  }

  return observable; 
}

We need to do one more thing to make sure our operators are compatible with this new pipe function. For example, our current map operator expects both transformFn and source at the same time. That just won't happen inside pipe. Will have to split it into two functions, one that will take the initial necessary parameters to make it work and another one that takes the source observable.

There are a couple of ways we can do this.

// Option 1
function map(transformFn) {
  // Instead of returning an observable 
  // we return a function that expects a source
  return source$ => Observable(observer => 
    source$.subscribe(value => observer.next(
      transformFn(value)
    ))
  );
}

// Option 2
function map(transformFn, source$) {
  if(source$ === undefined) {
    // we'll return a function 
    // that will "remember" the transform function
    // and expect the source and put in its place.

    return placeholder => map(transformFn, placeholder);
  }

  return Observable(observer => 
    source$.subscribe(value => observer.next(
      transformFn(value)
    ))
  );
}

And finally we can extend our observable in this way:

var thisArray = [1, 2, 3, 4];
var plusOne   = num => num + 1;
var timesTwo  = num => num * 2;

var array$ = fromArray(thisArray).pipe(
  map(plusOne),
  map(timesTwo),
  map(num => `number: ${num}`),
  // ... many more operators
);

array$.subscribe(value => console.log(value));

Now we are ready to create more operators.

Exercise time

Lets say that we have a piece of code that prints a "time string" to the console every second, and stops after five seconds (because why not). This guy right here:

function startTimer() {
  var time = 0;
  var interval = setInterval(function() {
    time = time + 1;

    var minutes = Math.floor((time / 60) % 60).toString().padStart(2, '0');
    var seconds = Math.floor(time % 60).toString().padStart(2, '0');
    var timeString = minutes + ':' + seconds;

    console.log(timeString);

    if(timeString === '00:05') {
      clearInterval(interval);
    }
  }, 1000);
}

There is nothing wrong with this piece of code. I mean, it does the job, it's predictable, and everything you need to know about it is there in plain sight. But you know, we are in a refactoring mood and we just learned something new. We'll turn this into an observable thingy.

First things first, lets make a couple of helper function that handle the formatting and time calculations.

function paddedNumber(num) {
  return num.toString().padStart(2, '0');
}

function readableTime(time) {
  var minutes = Math.floor((time / 60) % 60);
  var seconds = Math.floor(time % 60);

  return paddedNumber(minutes) + ':' + paddedNumber(seconds);
}

Now lets handle the time. setInterval is a great candidate for a data source, it takes a callback in which we could produce values, it also has a "cleanup" mechanism. It just makes the perfect observable.

function interval(delay) {
  return Observable(function(observer) {
    var counter   = 0;
    var callback  = () => observer.next(counter++);
    var _interval = setInterval(callback, delay);

    observer.setUnsubscribe(() => clearInterval(_interval));

    return observer.unsubscribe;
  });
}

This is amazing, we now have really reusable way to set and destroy an interval.

You may have notice that we are passing a number to the observer, we are not calling it seconds because the delay can be any arbitrary number. In here we're not keeping track of the time, we are merely counting how many times the callback has been executed. Why? Because we want to make every observable factory as generic as possible. We can always modify the value that it emits by using operators.

This how we could use our new interval function.

// pretend we have our helper functions in scope.

var time$ = interval(1000).pipe(
  map(plusOne),
  map(readableTime)
);

var unsubscribe = time$.subscribe(function(timeString) {
  console.log(timeString);

  if(timeString === '00:05') {
    unsubscribe();
  }
});

That's better. But that if bothers me. I feel like that behavior doesn't belong in there. You know what? I'll make an operator that can unsubscribe to the interval after it emits five values.

// I'll named "take" because naming is hard.
// Also, that is how is called in other libraries.

function take(total) {
  return source$ => Observable(function(observer) {
    // we'll have our own counter because I don't trust in the values
    // that other observables emits
    var count = 0;
    var unsubscribeSource = source$.subscribe(function(value) {
      count++;
      // we pass every single value to the observer.
      // the subscribe function will still get every value in the stream 
      observer.next(value);

      if (count === total) {
        // we signal the completion of the stream and "destroy" the thing
        observer.complete();
        unsubscribeSource();
      }
    });
  });
}

Now we can have a self destructing timer. Finally.

// pretend we have our helper functions in scope.

var time$ = interval(1000).pipe(
  map(plusOne),
  map(readableTime),
  take(5)
);

time$.subscribe({
  next: timeString => console.log(timeString),
  complete: () => console.info("Time's up")
});

Playgrounds

I made a couple of pens so you can play around with this stuff. This pen contains all the Observable related code that I wrote for this posts and them some more.

And this is the pen for the exercise.

Conclusion

I'll said it again, Observables are a powerful abstraction. They can let you process streams of data one chunk at a time. Not only that, but also let you piece together solutions that can be compose by generic functions and custom functions specific to the problem at hand.

Fair warning though. They are not the ultimate solution to every problem. You'll have to decide if the complexity is worth it. Like in the exercise, we lose the simplicity of the startTimer in order to gain some flexibility (that we could've achieve some other way).

Other sources

Who’s Afraid of Observables?
Understanding mergeMap and switchMap in RxJS
JavaScript — Observables Under The Hood
Github repository - zen-observable
Understanding Observables

Thank you for reading.

Discussion

pic
Editor guide