DEV Community

Cover image for Publish/Subscribe with Sidekiq
štef for Productive

Posted on

Publish/Subscribe with Sidekiq

Introduction

Our Rails application is an old monolith that relies heavily on after/before callbacks to trigger code that has side effects. This usually means that one model can change records of another model. I suspect that we all have come across this "callback hell" at least once in our professional life.

We needed to introduce a new service for search. As we settled on using meilisearch, we needed a way to sync updates on our models with the records in meilisearch. We could've continued to use callbacks but we needed something better.

We settled on a Publish/Subscribe (Pub/Sub) pattern.
Pub/Sub is a design pattern where publishers broadcast messages to topics without specifying recipients, and subscribers listen for specific topics without knowing the source, promoting loose coupling.

Exploring Pub/Sub Solutions

While searching for a solution we came across these:

  • ActiveSupport::Notification: A Rails component for defining and subscribing to events within an application​.
    -> Active::Support is synchronous by default. It does not handle errors and retries out of the box. We would need to use a background job handler (like Sidekiq) to make it asynchronous.

  • Wisper: A Ruby gem providing a decoupled communication layer between different parts of an application​
    -> I personally dislike wisper. I used it in the past and dislike the way of defining subscribers in a global way. I wanted topics to be arbitrary and each class to define what to subscribe for itself.

  • Dry-events: A component of the dry-rb ecosystem focused on event publishing and subscription​
    -> This gem looks like a framework on top of which I would need to create my own Pub/Sub. It also does not do asynchronous execution out of the box.

We finally settled on Sidekiq!
Sidekiq is a background job processing tool, which we already use in our application and we figured a way to use it for processing Pub/Sub messages.

Implementation

Implementation is quite straightforward.

class PubSub
  class SubscriberJob < ApplicationJob
    queue_as :pub_sub

    def perform(message:, class_name:, handler:)
      class_name.constantize.public_send(handler, message)
    end
  end

  include Singleton

  # Publish a message to topic
  #
  # @param topic [String]
  # @param message [Hash]
  def self.publish(topic, **message)
   instance.subscribers(topic).each do |subscriber|
      PubSub::SubscriberJob.perform_later(message: message, **subscriber)
    end
  end

  # Subscribe a class + handler to a topic
  #
  # @param topic [String]
  # @param class_name [String]
  # @param handler [String]
  def self.subscribe(topic, class_name, handler, async: true)
    instance.subscribe(topic, class_name, handler)
  end

  def initialize
    @subscribers = {}
  end

  # return subscribers for the topic
  #
  # @param topic [String]
  # @return [Array<Hash>] { class_name: String, handler: String}
  def subscribers(topic)
    @subscribers.fetch(topic, Set.new)
  end

  # Subscribe a class + handler to a topic
  #
  # @param topic [String]
  # @param class_name [String]
  # @param handler [String]
  def subscribe(topic, class_name, handler, async: true)
      @subscribers[topic] ||= Set.new
      @subscribers[topic] << { class_name: class_name.to_s, handler: handler }
  end
end
Enter fullscreen mode Exit fullscreen mode

Usage

Usage is also quite straightforward:

class Search::Tasks::Indexer
   PubSub.subscribe('task.upsert', self, :on_upsert)  )

   def self.on_upsert(message)
     item = message[:item]

     MeilisearchClient.reindex(item.to_meilisearch)
   end
end

class TaskForm < ApplicationForm
  def save
    super
    PubSub.publish('task.upsert', item: self)
  end
end
Enter fullscreen mode Exit fullscreen mode

Experience

After 6 months of using this method, I can safely say that it works as intended.

  • It handles unexpected errors like network errors gracefully
  • It was easy to use where we needed it
  • Classes that handle the incoming messages are separated from the rest of the models and are easy to unit test

📖 Sidestory: Rethinking Pub-Sub
Recently Buha wrote a blog post how he gave up on the Pub/Sub approach. It is a good read to see the downsides of this approach.

Conclusion

We solved the problem of pub/sub messaging in our product with the help of Sidekiq. It proved to be reliable and of high quality, and the implementation itself is not complicated.

What are your experiences with Sidekiq? Are you using something else?

Top comments (3)

Collapse
 
rafaeldev profile image
Rafael Gomes

Thanks for your post!

Can you tell us about deserialization?
Sidekiq 7+ prefers hash object as json to handle it and in your example, you are passing self in item named parameter. Are you handle object in publish or in other part?

Collapse
 
pimp_my_ruby profile image
Pimp My Ruby • Edited

Hi! Thanks for sharing this. We use whisper on a daily basis and it really feels overwhelming. Your approach looks very simple and easy to use. Have you encountered any issues ?

Collapse
 
d4be4st profile image
štef

On the execution side, no. We have been running this for the last 6 months and haven't touched that code since :)

The problem we encountered is adoption. People just like after_commit hooks better for some reason...