DEV Community

sylvance
sylvance

Posted on

Implementing the Outbox Pattern with Sinatra, Sidekiq, and Postgres

Prerequisites:

  • Ruby installed on your system
  • Postgres installed and running
  • Redis server installed and running (default port: 6379)
  • Bundler gem installed (gem install bundler)

Step 1: Setup the Project

1.1 Create a new project directory and navigate to it:

Copy code

mkdir outbox_sinatra_example
cd outbox_sinatra_example
Enter fullscreen mode Exit fullscreen mode

1.2 Create a Gemfile to manage dependencies:

source 'https://rubygems.org'

gem 'sinatra'
gem 'pg'
gem 'sidekiq'
Enter fullscreen mode Exit fullscreen mode

1.3 Install the required gems using Bundler:

bundle install
Enter fullscreen mode Exit fullscreen mode

Step 2: Create the Outbox Table

2.1 Access your Postgres database

Assuming you have a database named 'your_database', using psql or any other PostgreSQL client.

2.2 Create the Outbox table:

In the psql shell do below, this will create a table for oubox messages;

CREATE TABLE outbox (
  id SERIAL PRIMARY KEY,
  message_type VARCHAR(255),
  message_body TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  processed_at TIMESTAMPTZ
);
Enter fullscreen mode Exit fullscreen mode

Step 3: Implement the Sinatra Endpoint and OutboxWorker

3.1 Create a new file named app.rb in the project directory:

# app.rb
require 'sinatra'
require 'pg'
require 'sidekiq'
require 'json'

# Configure Sidekiq with Redis URL
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis://localhost:6379/0' }
end

class OutboxWorker
  include Sidekiq::Worker
  sidekiq_options retry: 5 # Set the number of retries on failure

  def perform(message_id)
    conn = PG.connect(dbname: 'your_database')
    message = conn.exec_params('SELECT * FROM outbox WHERE id = $1', [message_id]).first
    return unless message

    # Ensure idempotency - check if the message has already been processed
    return if message['processed_at']

    begin
      # Process the message here (e.g., send the message to the appropriate destination)
      # ...

      # Mark the message as processed and record the processing time
      conn.exec_params(
        'UPDATE outbox SET processed_at = $1 WHERE id = $2',
        [Time.now, message_id]
      )
    rescue StandardError => e
      # If an error occurs, log it and let Sidekiq retry the job
      puts "Error processing message (id=#{message['id']}): #{e.message}"
      raise e
    end
  end
end

post '/create-message' do
  content_type :json
  data = JSON.parse(request.body.read)

  message_type = data['message_type']
  message_body = data['message_body']

  conn = PG.connect(dbname: 'your_database')
  result = conn.exec_params(
    'INSERT INTO outbox (message_type, message_body) VALUES ($1, $2) RETURNING id',
    [message_type, message_body]
  ).first

  # Enqueue the message for background processing
  OutboxWorker.perform_async(result['id']) if result

  status 201
  { message: 'Message created and enqueued for processing' }.to_json
end
Enter fullscreen mode Exit fullscreen mode

Step 4: Set Up the config.ru File

4.1 Create a config.ru file to define the application for use with Rack:

# config.ru
require './app'

run Sinatra::Application
Enter fullscreen mode Exit fullscreen mode

Step 5: Start the Application

5.1 Start Sidekiq worker in a separate terminal window:

bundle exec sidekiq -r ./app.rb
Enter fullscreen mode Exit fullscreen mode

5.2 Start the Sinatra application using the Rack server:

bundle exec rackup
Enter fullscreen mode Exit fullscreen mode

Step 6: Testing the Outbox Endpoint

Use a tool like curl or Postman to send a POST request to create a new message:

curl -X POST -H "Content-Type: application/json" -d '{"message_type":"notification","message_body":"Hello, World!"}' http://localhost:9292/create-message
Enter fullscreen mode Exit fullscreen mode

Step 7: Verify Message Processing

Check the logs in the terminal running Sidekiq to see the message processing:

[INFO] 2023-08-07T00:00:00.000Z: [OutboxWorker] start
Processing message: 1, Type: notification, Body: Hello, World!
[INFO] 2023-08-07T00:00:00.000Z: [OutboxWorker] done: 3.123 sec
Enter fullscreen mode Exit fullscreen mode

With these changes, the OutboxWorker now handles retries on failure using Sidekiq's retry mechanism. If a processing error occurs, the job will be retried up to 5 times (or any number you specify in sidekiq_options) before being moved to the Sidekiq dead queue.

Additionally, we've introduced idempotency in the worker by checking if the message has already been processed before attempting to process it again. If the processed_at field in the Outbox table is already set, the worker will skip processing the message again, ensuring that the same message isn't processed multiple times.

Keep in mind that idempotency also depends on the processing logic of the actual message handler, so ensure that your message processing code is idempotent as well.

Top comments (0)