DEV Community

Deyan Petrov
Deyan Petrov

Posted on • Updated on

Use Change Streams instead of Traditional Outbox or Distributed Transactions

TLDR; Use Change Streams instead of traditional outbox pattern with local database transaction, or even worse - distributed transaction across database and message bus.

Introduction

In the context of Microservice Architecture (MSA) and Event-Driven Architecture (EDA) one of the core questions is how to save an entity's state to the database and reliably (= with 100% guarantee) push corresponding event(s) to a message bus, so that other services can eventually consume the published event(s) and do their job in an eventually consistent manner.

To my surprise there is still not enough awareness (e.g. judging by answers I get in interviews) of what is the most efficient solution and furthermore, experts with 20+ years of experience are still referring to older patterns, which are already superseded by newer, more efficient ones.

In detail, assuming we have these 2 operations:

  1. An entity's state is written to a database table (regardless if insert/update of last state, or insert of event in the event-sourced persistence approach)

  2. An event message is written to a message bus,

the problem to solve is the following: the process could crash in between the 2 steps which leaves us with an inserted/updated entity in the database but with no event sent to other microservices, so some side-effects (e.g. sending welcome email) are not performed.

Note: This post does not deal with the topic how to create Domain Events/External Events, for more info on one way of doing that please read the (older) post Current State + Last Event as an alternative to Event Sourcing.

Traditional Solutions

Outbox Pattern with a local database transaction

Image description

The idea here is to take advantage of local database transactions in order to guarantee 100% that an event message is also written in addition to writing the entity, and then process all event messages in a reliable/retriable manner (again with 100% guarantee that every message will be processed at least once).

The following 2 steps need to be implemented:

  1. Bundle together the insert/update of the entity's state with an insert of a "message/event" in another database table (in the same database), within a local database transaction:
BEGIN TRANSACTION
INSERT INTO/UPDATE Customers ...
INSERT INTO CustomerEvents ....
COMMIT TRANSACTION
Enter fullscreen mode Exit fullscreen mode
  1. Then have a "Reader" process which goes over the CustomerEvents sequentially, reads every record and publishes the event onto a message bus, and marks the event as "published". In case of an error (process crashes), then start again from last not-published event. As it is possible to experience a crash in between

e.g. every X seconds

SELECT * FROM CustomerEvents 
WHERE IsPublished = False 
ORDER BY CreatedOn ASC
Enter fullscreen mode Exit fullscreen mode
messageBusClient.publish(event)
Enter fullscreen mode Exit fullscreen mode
UPDATE CustomerEvents 
SET IsPublished = True 
WHERE CustomerEventId = eventId
Enter fullscreen mode Exit fullscreen mode

"Issues" with this approach:

  1. Need to create/maintain the CustomerEvents table

  2. Upon updating the Customer entity it is necessary to take care also of inserting into the CustomerEvents table, and this in a local database transaction (imagine there are many microservices with many entities to store) ... Ideally one would need to only care about inserting/updating Customers table, without anything else ...

  3. The local database transaction creates short-lived page locks, which reduce concurrency

Distributed Transactions

Image description

This approach spans a distributed transaction between 2 different technologies - database and message bus. Used to be quite popular 15-20 years ago, but since then many sources discourage from using it due to vendor lock-in, complexity, concurrency issues.

Pseudo code would be:

distributedTransactionCoordinator.StartTransaction()
try 
    databaseClient.insert/update(customer)
    messageBusClient.publish(event)
    distributedTransactionCoordinator.CommitTransaction()
with _ ->
    distributedTransactionCoordinator.RollbackTransaction()
Enter fullscreen mode Exit fullscreen mode

I have not used distributed transactions in the past 15 years myself so treat the above pseudo code as a very rough approximation conveying only the general idea.

"Issues" with this approach:

  1. Distributed transactions are not really supported across all different technologies, such should be carefully chosen ..
  2. Distributed transactions have their own complexity and implications in terms of rollbacks, locks, etc.

Note: Explaining how distributed transaction coordinators are implemented using Two-Phase Commit (2PC) is beyond the scope of this post, as this approach is anyway not used/recommended.

Recommended Solution: Change Streams/Feed/Change Data Capture

Image description

Many databases are tracking the changes to table rows (or collection documents) in a separate table/collection, and are using it for "notifying" interested subscribers about these (either internal - e.g. for data replication across nodes, or external - e.g. custom application code).

Some examples:

One example is MongoDB Change Streams which track all changes of a database and its collections in a special system collection local.oplog.rs being used both for data replication across nodes in a cluster, as well as for "watching" or polling by external clients with a pretty convenient client SDK:

async {
    let! cursor = col.WatchAsync(pipeline, options, cancellationToken) |> Async.AwaitTask
    do! cursor.ForEachAsync(
        (fun change -> printfn "%A" change),
        cancellationToken)
// This should not be reached as the cursor is processing forever
}
Enter fullscreen mode Exit fullscreen mode

The local.oplog.rs is a capped collection, which means after some time the oldest entries are removed automatically from it. How long/much data is stored in this collection is configurable.

A sample document (= change in the code above) in local.oplog.rs looks like this:

Image description

A client application can "watch" (poll) multiple collections individually, and for example publish events on a message bus. In this way by hooking to the database change stream the 100% guaranteed publishing to a message bus can be accomplished.

Why this approach is better:

  1. When inserting/updating/deleting the entity in the original table/collection nothing else needs to be done. Also no 1:1 table/collection needs to be manually maintained

  2. The database change stream mechanism is of production quality, used also internally by the database itself, and is very stable/reliable. Same for the SDK/API for consuming it.

Additional Considerations

Message Delivery, Idempotency, Duplicate Detection

Having in mind the 3 message delivery guarantees:

  • At most once
  • At least once
  • Exactly once,

the most practical approach is to settle on "At least once", which means that the event message will be published at least once, but it is possible that it is published 2+ times (e.g. in case the publishing process crashes, or a timeout is received when calling the message bus API). Such duplicate event messages can be handled in the following ways:

  1. Idempotent processing or event message deduplication by the subscriber
  2. Event message deduplication by the Message Bus middleware

Idempotent processing or event message deduplication by the subscriber in simple terms means that if the same event is received by a subscriber twice, the subscriber is able to detect that this event has already been processed, do nothing and return OK (as if the event was processed successfully). This can be achieved in different ways, e.g.:

  1. A local ProcessedEvents table in the subscriber's database which contains the (unique) event id of those events which were successfully processed. Upon receiving a new event first a lookup in that table is done, and if nothing found only then the event is processed, otherwise return OK directly

  2. The event id is used as a column in another table, and there is a unique constraint on it. Upon inserting a new row to this other table the unique constraint generates an error in case of duplicate processing, which can be caught and converted to OK result

  3. The primary key of another table can be the same as the event id, so that again upon duplicate insert the default unique constraint generates an error in case of duplicate processing, which can be caught and converted to OK result

Event message deduplication by the message bus middleware itself is something that I usually do not use, as I prefer not to be tightly coupled to specific middleware with exclusive functionality, but instead be able to use any message bus having just basic pub-sub (at least once) functionality ...

Checkpointing

What happens if the subscriber process consuming the change stream dies/crashes? Upon restarting that process has several options:

  1. Start re-processing all change stream events from the beginning - this will result in a lot of duplicate events published, and requires idempotent/deduplicating downstream subscribers

  2. Start from the end of the change streams (only new change stream events will be processed) - in this case some events in between will never be processed/published.

  3. Start from a checkpoint which indicates last successfully processed change stream entry.

Obviously option 3 is the best but it requires a little bit of extension to the change stream "watching" code:

  1. Upon startup the watcher reads the last checkpoint from a custom table and starts watching change stream events from that one onwards
  2. After processing every 1 (or N) change stream events the watcher must update the checkpoint with the last successfully processed event id/timestamp (in case of MongoDB every change stream event has a ResumeToken property). If checkpoints are saved upon every processed change stream event then max 1 change stream events will be retried. In case the checkpoint is saved less frequently, e.g. after every 5th event, then max 5 events will be retried again in case of a process crash or similar. In both cases the "at least once" message delivery guarantee is still valid, and if downstream processor are implemented with this in mind there should be no problem.

Multiple subscribers (Competing Consumers)

By default the change stream event handling application is a single instance/single thread application, which gets all change stream events and publishes them to a message bus as event messages. This is a very fast operation and for "normal" applications a single instance is perfectly fine, as long as it is hosted in such a way (e.g. Kubernetes) that if it dies then automatically a new instance is spawned.

But what if there are so many events per second that multiple instances are required to run in parallel?

Image description

This is something I have not needed so far, but in such a case I would probably have e.g. 3 application instances listening to the same change stream (Customers) and skipping every first, second or third change stream event. Every change stream "watcher" will have its own checkpoint.

Event Replay

Sometimes there is need for replaying all events. We have 2 "buffers" to consider:

  1. The Change Stream database storage - in case of MongoDB is configurable but usually does not hold all changes since the creation of the database

  2. The Message Bus - also stores messages only for a limited period of time, e.g. in case of standard Azure Event Hubs for 7 days

In the best case only change stream events or event messages are replayed which in one of the 2 buffers.

If events must be replayed all the way from source table (e.g. Customers) then we have 3 options:

  1. All changes to Customers are additionally stored in an CustomerAuditTrailEntries collection, so the latter can be read sequentially and for each entry a custom change stream event can be created and published to the message bus directly.

  2. Customers table actually does not exist, Event Sourcing is used and every single Customer event is inserted as a separate table row/document. Then iterating over these rows/documents will be sufficient for generating the event messages and publishing them to the message bus

  3. No audit trail is stored, no Event Sourcing is used, updates are done in place. In this case the best that can be done is to generate and publish to the message bus events for the last state of each customer entity.

It is worth mentioning that the Event Replay itself can also be exposed to external webhook subscribers who can trigger it over a REST API (e.g. POST /replays with from,to,eventType and subscriberId properties in the request body).

Furthermore, the event replay should ideally publish event message to a separate topic on the message bus, to which only interested in replay subscribers should be listening, otherwise all existing subscribers can be flooded with a lot of events.

Last but not list, sometimes direct database manipulations might be required (e.g. database refactoring/migrations). Such migration should or should not trigger change stream event processing. In case of the latter, a simple filtering scheme on some entity property (e.g. LastModifiedOn) should be introducing in the change stream "watching" application so that such changes can be skipped.

Error Handling

While processing the change stream events (= looping over the open cursor polling the change stream collection) an error/exception may happen in relation to:

  1. Individual document - e.g. cannot be deserialized, or publishing to message bus fails due to transient error
  2. The whole change stream - e.g. network/database outage

Individual document processing error can be handled by simply "parking" the document into a retry queue, which is handled by another thread automatically with a small delay, and if reprocessing again fails the message can be parked in a poison queue, to be looked at manually.

In case of a general error the change stream can be either restarted from the last saved checkpoint after a small delay (= catch exception, re-open cursor), or the whole process may be let to crash, which would invoke its automatic restart by e.g. Kubernetes.

Remember, what we are speaking about here is just iterating over a database collection containing changes (= change stream) while keeping a clientside "index" how far down the road the watcher is. In case of a single watcher (enough in most cases) you can always stop for a while, wait, restart from last index or even from a previous position on the stream. This is the same simple and scalable approach used in many other technologies based on the append-only "Log".

Further Reading:

  1. https://blog.insiderattack.net/atomic-microservices-transactions-with-mongodb-transactional-outbox-1c96e0522e7c
  2. https://medium.com/wix-engineering/event-driven-architecture-5-pitfalls-to-avoid-b3ebf885bdb1

Top comments (0)