DEV Community

loading...
Cover image for Elixir Pubsub In Less Than 50 Lines

Elixir Pubsub In Less Than 50 Lines

alexdesousa profile image Alex de Sousa Originally published at thebroken.link ・3 min read

:pg2 is a mostly unknown, but powerful Erlang module. It provides an API for creating process groups.

Process Group

So, what's a process group? Well... it's a group of Erlang/Elixir processes.

Perhaps, the correct question would be, why do we care about process groups? Well, process groups are the foundation for publisher-subscribers (pubsubs for short).

PG2

Understanding :pg2 API and how it relates to a pubsub API will make it easier to understand:

  • Every process group is a channel e.g. a group called :my_channel is created:
   iex> :pg2.create(:my_channel)
   :ok
  • Every process in a group is a subscriber e.g. self() is part of :my_channel group:
   iex> :pg2.join(:my_channel, self())
   :ok
  • A publisher can send/2 messages to a channel e.g. the publisher gets all the members of the group :my_channel and sends "Some message":
   iex> members = :pg2.get_members(:my_channel)
   :ok
   iex> for member <- members, do: send(member, "Some message")
  • A subscriber will receive the messages in its mailbox:
   iex> flush()
   "Some message"
   :ok
  • A subscriber can unsubscribe from a channel e.g. self() leaves the group :my_channel:
   iex> :pg2.leave(:my_channel, self())
   :ok
  • A channel can be deleted:
   iex> :pg2.delete(:my_channel)
   :ok

And that's it! That's the API. And you know what's the best thing about it? It can work between connected nodes. Keep reading and you'll see :)

Message in cereal

Implementing a PubSub

A PubSub has three main functions:

  • subscribe/1 for subscribing to a channel:
   def subscribe(channel) do
     pid = self()

     case :pg2.get_members(channel) do
       members when is_list(members) ->
         if pid in members do
           :ok                     # It's already subscribed.
         else
           :pg2.join(channel, pid) # Subscribes to channel
         end

       {:error, {:no_such_group, ^channel}} ->
         :pg2.create(channel)      # Creates channel
         :pg2.join(channel, pid)   # Subscribe to channel
     end
   end
  • unsubscribe/1 for unsubscribing from a channel.
    def unsubscribe(channel) do
      pid = self()

      case :pg2.get_members(channel) do
        [^pid] ->
          :pg2.leave(channel, pid)   # Unsubscribes from channel
          :pg2.delete(channel)       # Deletes the channel

        members when is_list(members) ->
          if pid in members do
            :pg2.leave(channel, pid) # Unsubscribes from channel
          else
            :ok                      # It's already unsubscribed
          end

        _ ->
          :ok
      end
    end
  • publish/2 for sending a message to a channel.
   def publish(channel, message) do
     case :pg2.get_members(channel) do
       [_ | _] = members ->
         for member <- members, do: send(member, message)
         :ok

       _ ->
         :ok
     end
   end

For a full implementation of PubSub you can check this gist.

I usually create a .iex.exs file in my $HOME folder and then run iex. You could do the same with the previous gist by doing the following:

~ $ PUBSUB="https://gist.githubusercontent.com/alexdesousa/4d592fe206cca17393affaefa4c8fd33/raw/4d84894f016bd9eef84bba647c77c62b9c9a6094/pub_sub.ex"
~ $ curl "$PUBSUB" -o .iex.exs
~ $ iex

It's that easy

Distributed PubSub

For our distributed experiment we'll need two nodes. My machine is called matrix and both nodes will be neo and trinity respectively:

  • :neo@matrix:
   alex@matrix ~ $ iex --sname neo
   iex(neo@matrix)1>
  • :trinity@matrix:
   alex@matrix ~ $ iex --sname trinity
   iex(trinity@matrix)1> Node.connect(:neo@matrix) # Connects both nodes

Now :neo@matrix can subscribe to :mainframe channel:

iex(neo@matrix)1> PubSub.subscribe(:mainframe)
:ok

And :trinity@matrix can send a message:

iex(trinity@matrix)2> PubSub.publish(:mainframe, "Wake up, Neo...")
:ok 

Note: Sometimes it takes a bit of time for nodes to synchronize their process groups, so you might need to publish/2 your message twice.

Finally, :neo@matrix should receive the message:

iex(neo@matrix)2> flush()
"Wake up, Neo..."
:ok

And that's it. A powerful pubsub in a few lines of code thanks to :pg2.

Follow the white rabbit

Conclusion

Erlang has several built-in hidden gems like :pg2 that make our lives easier.

gem

Happy coding!

Cover image by Nicolas Picard

Discussion (0)

pic
Editor guide