loading...

The Publisher Subscriber pattern

jamesrweb profile image James Robb ・5 min read

The Publisher Subscriber pattern, also known as PubSub, is an architectural pattern for relaying messages to interested parties through a publisher. The publisher is not generally aware of the subscribers per say but in our implementation it will so that we can ease into the topic.

The PubSub pattern gives us a scalable way to relay messages around our applications but is inflexible in one area and that is the data structure that is sent to each subscriber when a new message is published. Generally this is a good thing though in my opinion since it allows a nice normalised way of transacting data through our applications.

Tests

For the tests I will be using JavaScript and the Jest test runner.

const Publisher = require('./publisher');

let publisher;
beforeEach(() => publisher = new Publisher);

describe("Publisher", () => {
  it("Should construct with default values", () => {
    expect(publisher.topic).toEqual("unknown");
    expect(publisher.subscribers).toEqual([]);
  });

  it("Should add subscribers properly", () => {
    const subscriber = jest.fn();
    expect(publisher.subscribers.length).toEqual(0);
    publisher.subscribe(subscriber);
    expect(publisher.subscribers.length).toEqual(1);
  });

  it("Should publish updates to subscribers", () => {
    const subscriber = jest.fn();
    publisher.subscribe(subscriber);
    publisher.publish("test");
    expect(subscriber).toHaveBeenCalledWith({
      topic: "unknown",
      data: "test"
    });
  });

  it("Should unsubscribe from updates as required", () => {
    const subscriber = jest.fn();
    const subscription = publisher.subscribe(subscriber);
    publisher.publish("test");
    expect(subscriber).toHaveBeenCalledTimes(1);
    publisher.unsubscribe(subscription);
    publisher.publish("test");
    expect(subscriber).toHaveBeenCalledTimes(1);
  });

  it("Should not unsubscribe a subscriber from updates unless it exists", () => {
    const subscriber = jest.fn();
    publisher.subscribe(subscriber);
    expect(publisher.subscribers.length).toEqual(1);
    publisher.unsubscribe(() => 24);
    expect(publisher.subscribers.length).toEqual(1);
  });

  it("Generates a consistent subscription id for each subscriber", () => {
    const subscriber = jest.fn();
    const subscription = publisher.subscribe(subscriber);
    const proof = publisher.createSubscriptionId(subscriber);
    expect(subscription).toEqual(proof);
  });
});
Enter fullscreen mode Exit fullscreen mode

Here we test that:

  1. We begin with sane defaults
  2. We can add subscribers
  3. We can notify subscribers
  4. We can remove subscribers
  5. We only remove subscribers when they exist
  6. We generate consistent ids for each subscriber that is provided

You can run the tests here:

This covers the bases required of a publisher and subscriber and gives us control over who does and does not get notifications when new content is published. Pretty simple so far, right?

Implementation

For our implementation I will be using TypeScript, a typed superset of JavaScript. If you are more comfortable with JavaScript you can compile TypeScript code to JavaScript in the TypeScript playground.

export interface ISubscriberOutput { 
  topic: string; 
  data: any; 
};

export class Publisher {
  public topic: string = "unknown";
  private subscribers: Function[] = [];

  public subscribe(subscriberFn: Function): number {
    this.subscribers = [...this.subscribers, subscriberFn];
    const subscriptionId = this.createSubscriptionId(subscriberFn);
    return subscriptionId;
  }

  public publish(data: any): void {
    this.subscribers.forEach((subscriberFn: Function) => {
      const output: ISubscriberOutput = { topic: this.topic, data };
      subscriberFn(output);
    });
  }

  public unsubscribe(subscriptionId: number): void {
    const subscriberFns = [...this.subscribers];
    subscriberFns.forEach((subscriberFn: Function, index: number) => {
      if(this.createSubscriptionId(subscriberFn) === subscriptionId) {
        subscriberFns.splice(index, 1);
        this.subscribers = [...subscriberFns];
      }
    });
  }

  private createSubscriptionId(subscriberFn: Function): number {
    const encodeString = this.topic + subscriberFn.toString();
    return [...encodeString].reduce((accumulator, char) => {
      return char.charCodeAt(0) + ((accumulator << 5) - accumulator);
    }, 0);
  }
}
Enter fullscreen mode Exit fullscreen mode

This class generates a Publisher with a set of methods for us to use for publishing updates, subscribing to those updates and also unsubscribing when the need arises. Let's break things down from top to bottom.

export interface ISubscriberOutput { 
  topic: string; 
  data: any; 
};
Enter fullscreen mode Exit fullscreen mode

This interface is able to be used by subscribers that will take in messages when the publish method is called on the Publisher and gives us the structured message output we discussed in the introduction of this article.

  public topic: string = "unknown";
  private subscribers: Function[] = [];
Enter fullscreen mode Exit fullscreen mode

As we begin to define the Publisher class, we first initialise the class with a topic of "unknown" since the topic hasn't been provided or overridden. We also have an array of subscribers initialised, each of which should be a Function.

Next we create the subscribe method. This will add the provided subscriberFn function to the subscribers array and then return a subscriptionId for us to use later should we choose to unsubscribe down the road.

  public subscribe(subscriberFn: Function): number {
    this.subscribers = [...this.subscribers, subscriberFn];
    const subscriptionId = this.createSubscriptionId(subscriberFn);
    return subscriptionId;
  }
Enter fullscreen mode Exit fullscreen mode

The createSubscriptionId generates a unique ID for each subscriber and utilises the same algorithm as the the Java String hashCode() Method.

  private createSubscriptionId(subscriberFn: Function): number {
    const encodeString = this.topic + subscriberFn.toString();
    return [...encodeString].reduce((accumulator, char) => {
      return char.charCodeAt(0) + ((accumulator << 5) - accumulator);
    }, 0);
  }
Enter fullscreen mode Exit fullscreen mode

In short we take the current topic and add to that the string representation of the subscriberFn. This gives us a somewhat unique string but is not bulletproof by any means. From here we take each character in the encodeString and reduce it to a number representation unique to that string.

Sidenote: We use the << operator which seems to confuse a lot of developers but in this case we are just doing this in essence: accumulator * (2 ** 5).

So let's say it is the second iteration of the reduce loop and accumulator is at an arbitrary value of 65 which is the character code for lowercase "a".

This being the case the (accumulator << 5) - accumulator line is the same as writing (65 * (2 ** 5)) - 65 during that iteration.

Either way you write it, the resulting number will be 2015 for that example.

Hopefully that makes a bit more sense for you who may not have experienced using bitwise operators before.

I highly recommend reading the MDN bitwise reference for more information.

If we want to unsubscribe from a Publisher at any time, you can simply call the unsubscribe method passing in the return value of the original subscribe call.

  public unsubscribe(subscriptionId: number): void {
    const subscriberFns = [...this.subscribers];
    subscriberFns.forEach((subscriberFn: Function, index: number) => {
      if(this.createSubscriptionId(subscriberFn) === subscriptionId) {
        subscriberFns.splice(index, 1);
        this.subscribers = [...subscriberFns];
      }
    });
  }
Enter fullscreen mode Exit fullscreen mode

Here we clone the current subscribers and loop over the clone until we find one that when it is hashed in the createSubscriptionId function, matches the provided subscriptionId value.

If we find a match then we remove that function from the subscriberFns array and set the subscribers to contain only the remaining subscriberFns.

Lastly we will look at the publish function which takes in some data which can be anything you wish to broadcast to the subscribers.

  public publish(data: any): void {
    this.subscribers.forEach((subscriberFn: Function) => {
      const output: ISubscriberOutput = { topic: this.topic, data };
      subscriberFn(output);
    });
  }
Enter fullscreen mode Exit fullscreen mode

We loop over the current subscribers and notify each one with an object matching the ISubscriberOutput structure.

Overall this implementation keeps things concise and to the point.

Example usage

An example use case could be an article publisher which notifies subscribers when new articles get published. It could look like this for example:

Conclusions

I like this pattern and how it allows a scalable and predictable messaging format and how flexible it can be to the needs of what you are building.

I think this ties in nicely with other architectural patterns like the microservices pattern which uses event queues to pass information around in a way that is not too dissimilar to PubSub.

Hopefully you found some value in todays post and you can make use of this pattern in the future!

Discussion

pic
Editor guide
Collapse
tweettamimi profile image
Tamimi Ahmad

I like how you explained it - thanks for sharing!

Collapse
jamesrweb profile image
James Robb Author

You're welcome Tamimi, I'm glad you found value in the post, thanks for dropping by!

Collapse
tweettamimi profile image
Tamimi Ahmad

I'm recently getting more into pub/sub and event-driven development, I attempted to build an event-driven NodeJS application on covid data that I wrote a blog post about here if you want to check it out dev.to/tweettamimi/how-i-built-an-...! It's pretty cool and I'm planing to build more side projects with it

Thread Thread
jamesrweb profile image
James Robb Author

I'll be sure to check that out, thanks for the share.