DEV Community

Cover image for What are Callbags?
Eugene Ghanizadeh
Eugene Ghanizadeh

Posted on • Originally published at loreanvictor.github.io

What are Callbags?

TLDR, A callback is a function we pass to another, source function, in order to give the source function a way to talk back to us (call us back). Callbag is a standard for callbacks that enables working with streams. A callbag is any callback that follows that standard.


Callbacks

Take this code:

console.log(source());
Enter fullscreen mode Exit fullscreen mode

When we run this code:

  1. source() is called. We wait for its output.
  2. We log the output from source().

What if source() takes some time to produce data? Instead of waiting for it, we could tell it to "give us data" (by calling it), and give it a way to call us back when it has some data:

source(data => console.log(data));
Enter fullscreen mode Exit fullscreen mode

Here, data => console.log(data) is a callback, as its the method we provide source() to call us back when it has data. source is basically a source of data, and we can communicate with it as follows:

-> [GREETING] "Give me data when you have it" # --> us to source
<- [DATA]     "Here is some data"             # --> source to us
Enter fullscreen mode Exit fullscreen mode

Streams

Now what if our source (e.g. source()) produces an indeterminate number of data entries? For example, our source might be a function responsible for calculating the position of the cursor on X-axis, or it might be a function who is supposed to give us messages coming from a web-socket.

👉 A source that produces an indeterminate number of data entries at indeterminate time intervals is called a stream.

In this case our simplistic callback (or the communication scheme) will be rather limiting:

  • We might want to tell the source to stop sending more data.
  • The source might want to tell us that it won't send more data (maybe due to some error).
  • Some sources push data whenever possible. Others might wait for us to ask them explicitly for more data, in which case we would need to be able to ask for more data as well.

None of these are available under our previous communication scheme, and we need an expanded communication scheme to be able to work properly with streams:

-> [GREETING] "Give me data whenever you have some"                              # --> us to source
<- [GREETING] "I will give you data whenever I have some. Tell me when to stop"  # --> source to us

<- [DATA]     "Here is some data"                                                # --> source to us
-> [DATA]     "Give me more data"                                                # --> us to source, when it needs to be pulled

-> [END]      "Stop sending more data"                                           # --> us to source
<- [END]      "I won't be sending more data (because of X)"                      # --> source to us
Enter fullscreen mode Exit fullscreen mode

To accomodate it we can have our callback accept two arguments instead of one: The first argument denoting the type of the message, the second one denoting the content (or payload):

source((type, payload) => {
  if (type === GREET) console.log('Listening ...');
  if (type === DATA) console.log(payload);
  if (type === END) console.log('Source ended!');
});
Enter fullscreen mode Exit fullscreen mode

Callbags

Callbag is just a term to denote any callback (or function) that looks like the callback we just designed for talking with source(). In other words, any function with the following signature is a callbag:

(type: GREET | DATA | END, payload?: any) => void;
Enter fullscreen mode Exit fullscreen mode

👉 In the callbag spec, message types are denoted by numbers:

  • 0 stands for GREET (also called START)
  • 1 stands for DATA
  • 2 stands for END.

Now lets look at the above example again:

source((type, payload) => {
  if (type === GREET) console.log('Listening ...');
  if (type === DATA) console.log(payload);
  if (type === END) console.log('Source ended!');
});
Enter fullscreen mode Exit fullscreen mode

Here, source is NOT a callbag, since it only accepts one argument. We can fix that by making source accept two
arguments as well, in which case our code would change like this:

source(GREET, (type, payload) => {
  if (type === GREET) console.log('Listeing ...');
  if (type === DATA) console.log(payload);
  if (type === END) console.log('Source ended!');
});
Enter fullscreen mode Exit fullscreen mode

Now what if we want to receive a limited number of data entries (say 5) from source? We greeted source by calling it with GREET alongside a callbag. According to our communication scheme, source needs to also greet us by sending us GREET alongside a way to tell it to stop, i.e. another callbag:

let talkback;
let N = 0;

source(GREET, (type, payload) => {
  if (type === GREET) {
    talkback = payload;                // --> when type is GREET, payload is a callbag
    console.log('Listening ...');
  }

  if (type === DATA) {
    console.log(payload);             // --> when type is DATA, payload is the data sent by source
    N++;
    if (N >= 5) talkback(END);        // --> telling the source to stop
  }

  if (type === END) console.log('Source ended!');
});
Enter fullscreen mode Exit fullscreen mode

👉 So whenever someone greets someone (us greeting the source, the source greeting us), the payload should be another callbag, acting as a way to talk back to the greeter. In this example, talkback plays that role.


Callbag Sources

So far we've just worked with source() as a stream, without looking inside it. But how would a callbag source actually look like? To see that, lets build a simple callbag source that outputs an increasing number every second:

const source = (type, payload) => {
  if (type === GREET) {                           // --> everything starts with a GREET
    let talkback = payload;                       // --> when greeted, the payload is a way to talk back to the greeter
    let i = 0;

    setInterval(() => talkback(DATA, i++), 1000); // --> lets tell the greeter about our increasing number every second
  }
}
Enter fullscreen mode Exit fullscreen mode

☝️ Here, we are not giving the caller any method of telling the source to stop sending more data. This is because we are not following the communication protocol properly: the source MUST greet back and provide a way of talking back (i.e. another callbg):

const source = (type, payload) => {
  if (type === GREET) {
    let talkback = payload;
    let i = 0;

    const interval = setInterval(() => talkback(DATA, i++), 1000);

    talkback(GREET, (_type, _payload) => {
      if (_type === END) clearInterval(interval);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

► Try It!


Callbags in Practice

In practice, you rarely need to greet sources or handle talkbacks manually. Utilities such as those provided in callbag-common take care of that for you:

import { interval, pipe, map, filter, subscribe } from 'callbag-common'

const source = interval(1000) // --> emits every second
pipe(
  source,
  map(x => x * 3),            // --> multiply by 3
  filter(x => x % 2),         // --> only allow odd numbers
  subscribe(console.log)      // --> log any incoming number
)

> 3
> 9
> 15
> 21
> 27
Enter fullscreen mode Exit fullscreen mode

► Try It!

The workflow is typically like this:

👉 You create some callbag sources, using source factories:

import { interval } from 'callbag-common';

const source = interval(1000);
Enter fullscreen mode Exit fullscreen mode

👉 You then transform these sources using operators.
For example, you might want to multiply each received number by 3:

import { interval, map } from 'callbag-common';

let source = interval(1000);
source = map(n => n * 3)(source);
Enter fullscreen mode Exit fullscreen mode

Or you might want to only pick odd numbers:

import { interval, map, filter } from 'callbag-common';

let source = interval(1000);
source = map(n => n * 3)(source);
source = filter(n => n % 2)(source);
Enter fullscreen mode Exit fullscreen mode

👉 Finally, you start listening to your transformed source by subscribing to it:

import { interval, map, filter, subscribe } from 'callbag-common';

let source = interval(1000);
source = map(n => n * 3)(source);
source = filter(n => n % 2)(source);

subscribe(console.log)(source);
Enter fullscreen mode Exit fullscreen mode

👉 It is also highly recommended to use the pipe() utility for transforming your sources and subscribing to them, as it makes the code much easier to read:

import { interval, map, filter, subscribe, pipe } from 'callbag-common';

pipe(
  interval(1000),
  map(n => n * 3),
  filter(n => n % 2),
  subscribe(console.log)
)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)