DEV Community

calvinsadewa
calvinsadewa

Posted on

Implementing Message Outbox Pattern with Oban

Hey guys, today i want to share Message Outbox pattern and implementation with Oban in Elixir
I have used them extensively and hope that it can benefit you too.

Problems

What happen if as part of database transaction, you need to send email / push notification / outside communication?
Implemented naively, we could just send email / push notification as part of transaction.

Imagine we have transfer_money function which send money from a sender to receiver,
as part of operation, transfering money need to also send email notification to sender and receiver.

Naively, we could make transfer_money a transaction which do:

  1. substract sender balance
  2. send email to sender regarding substracted money
  3. add receiver balance
  4. send email to receiver regarding added money

Here is pseudo-code snippet implementing transfer_money function in elixir.

# model/payment.ex
defmodule Model.Payment do
    alias Repo
    import Ecto.Changeset, only: [change: 2]

    # Transfer money from sender to receiver
    # as part of transfer, we will send email notification
    def transfer_money(%{
        sender: %Account{} = sender,
        receiver: %Account{} = receiver,
        amount: %Decimal{} = amount
    }) do
        Repo.transaction(fn -> 
          Repo.update!(change(sender, balance: Decimal.sub(sender.balance, amount))) # operation 1
          send_transfer_email!(sender, "substracted by", amount) # operation 2
          Repo.update!(change(receiver, balance: Decimal.add(receiver.balance, amount))) # operation 3
          send_transfer_email!(sender, "received", amount) # operation 4
        end)
    end

    # Send transfer email notification
    def send_transfer_email!(account, operation, amount) do
        to = account.email
        from "payment@mycompany.com"
        body = """
        Your bank account #{account.account_number} have #{operation} #{amount}
        """

        {:ok, _} = send_email(to, from, body)
    end
end
Enter fullscreen mode Exit fullscreen mode

Despite seeming okay, it is possible bug that can happen. Let me show a scenario:

  1. transfer_money function initiated
  2. Open a transaction
  3. substract sender balance by amount via DB operation
  4. send email notification to sender that balance has been substracted
  5. add receiver balance by amount via DB operation
  6. try send email notification to recevier that balance has been added, however failed due to transient network failure
  7. Transaction rolledback

In above failure scenario, even though transfer_money function rolledback,
sender already sended email notifing s/he bank account subtracted emoji.

While mis-sent email may seem harmless (though will probably shock sender that s/he subtracted twice),
These kind of cases can also happen with Message Queue (RabbitMQ, Kafka, SQS) using system,
which would mean subscribing service get wrong data, in turn may spawn more problem.

The problem is that operation 2 and 4 cannot rolledback, Message Outbox pattern help us with it.

Message Outbox

Message Outbox is a pattern in which we insert intention to send message to Outbox in transactional manner,
Outbox is usually a table which then polled/listened regulary by Worker. Worker would send any message waiting to be sended.

With that in mind, let us patch transfer_money to:

  1. substract sender balance
  2. insert intent to send email for sender to outbox
  3. add receiver balance
  4. insert intent to send email for receiver to outbox

We also need to implement Worker which poll/listen to our outbox, and send email if any pending.

Here is snippet of pseudo function implementing Message Outbox

# model/payment.ex
defmodule Model.Payment do
    ...
    # insert Send transfer email notification to outbox
    def send_transfer_email!(account, operation, amount) do
        Repo.insert!(%SendMessage{account: account, operation: operation, amount: amount})
    end

    # Real implementation of sending email
    def real_send_transfer_email!(account, operation, amount) do
        to = account.email
        from "payment@mycompany.com"
        body = """
        Your bank account #{account.account_number} have #{operation} #{amount}
        """

        {:ok, _} = send_email(to, from, body)
    end
end

# outbox/worker.ex
defmodule Outbox.Worker do
    # Perform is done regularly by worker, either via polling/listening
    def perform(%SendMessage{} = send_message) do
        Model.Payment.real_send_transfer_email!(account)
    end
end
Enter fullscreen mode Exit fullscreen mode

With Message Outbox, outside communication become part of transaction, and can be rolledback in case transaction failed.
However, implementing worker which listen/poll for message can be hard,
not to mention we still need to handle cases where we failed sended message and retry logic.

Oban

Oban is a transactional job queue in elixir built on PostgreSQL. It handle many of the operation relating to job queue such as enqueuing job,
distribute job to worker, handle job processing throughout lifecycle, log job status, and more. You can see more at link https://hexdocs.pm/oban/1.0.0/Oban.html

Because of it's transactional feature in PostgreSQL, we can leverage Oban as our Outbox.

Using Oban, we will do:

  1. Setup Oban in config.exs and Migration (I left out this part, see https://hexdocs.pm/oban/1.0.0/Oban.html )
  2. Implement an Oban worker which would send email for us, Outbox.Email, and implement processing for job send_email
  3. modify send_transfer_email! to insert job send_email to Outbox.Email

Let's modify our snippet to use Oban.

# model/payment.ex
defmodule Model.Payment do
    alias Repo
    import Ecto.Changeset, only: [change: 2]

    # Transfer money from sender to receiver
    # as part of transfer, we will send email notification
    def transfer_money(%{
        sender: %Account{} = sender,
        receiver: %Account{} = receiver,
        amount: %Decimal{} = amount
    }) do
        Repo.transaction(fn -> 
          Repo.update!(change(sender, balance: Decimal.sub(sender.balance, amount))) # operation 1
          send_transfer_email!(sender, "substracted by", amount) # operation 2
          Repo.update!(change(receiver, balance: Decimal.add(receiver.balance, amount))) # operation 3
          send_transfer_email!(sender, "received", amount) # operation 4
        end)
    end

    # insert intent to send transfer email notification
    def send_transfer_email!(account, operation, amount) do
        to = account.email
        from "payment@mycompany.com"
        body = """
        Your bank account #{account.account_number} have #{operation} #{amount}
        """

        # Insert args to Outbox.Email
        %{
            to: to, 
            from: from, 
            body: body, 
            event: "send_email"
        }
        |> Outbox.Email.new()
        |> Oban.insert!()
    end
end

# outbox/email.ex
defmodule Outbox.Email do
    use Oban.Worker, queue: :email

    @impl Oban.Worker
    # Process sending email
    def perform(%{"event" => "send_email"} = args, _job) do
        {:ok, _} = send_email(args["to"], args["from"], args["body"])
    end
end
Enter fullscreen mode Exit fullscreen mode

It's pretty simple doesn't it? Though args for Oban worker is interesting.
When i insert args to Outbox.Email, i insert it as Atom Map. However when i process args at perform function, it become String Map.
This is because when Oban inserting job to Database, it transform args to JSON, and when Worker try to process job, args is parsed as JSON.

While Message Outbox is good, one weakness is that Message Outbox cannot depend on outside communication result inside transaction.
If your transaction really need to depend on outside communication, you may consider implementing two phase (or three phase) commit.

**Note: Oban is built upon PostgreSQL, so if you use MariaDB/MySQL you may consider other Job Queue such as Rihanna or implement polling by using cron-based Quantum

**Special thanks for Nanda Girindratama and Rifki Adrian for early review

Top comments (0)