This pattern also known as application events, try to solves the following problem.
Problem
How to reliably/atomically update the database and send messages/events?
Forces
- 2PC is not an option. 2PC stands for Two phase commit...Google :).
- If the database transaction commits messages must be sent. Conversely, if the database rolls back, the messages must not be sent
- Messages must be sent to the message broker in the order they were sent by the service. This ordering must be preserved across multiple service instances that update the same aggregate.
Application description
In order to be more explicit with my examples, I'll describe you a silly application which business model revolves around cats, ok? So of course we have a
table in our DB called "cats", which store basic information about the cats registered in our platform. The problem that we want to resolve it's that
on every update that we made to a cat in our DB, for example address changed, name changed, starving state changed, etc... We want to let know this to other
services, or consumers in this case, that might use this information. All clear right, now reading at the Forces section, you must notice that those requirements are not that crazy. Now let's go to the solution, propose for this pattern.
Solution
The solution that this pattern propose is as follow. Apart from the table "cats", with all the information about cats, we will have another table that will be the outbox("buzón de salida", in my beloved language), let's name this table "events". In this table we will store messages/events, only when the operation in the cats DB succeeded. As both operations, the one that write into the "cats" table and the one that write into "events" table, will happen in the
same transaction this assure us that we will only commit the transaction when both operations succeeded.
Now, after this write in "events" table, we will have a process that will read from this table, and publish into the message broker.
The flow it's better described in the following image.
Benefits
- Messages are guaranteed to be sent if and only if the database transaction commits
- Messages are sent to the message broker in the order they were sent by the application
Drawbacks
- Developer might forget to publish the message/event after updating the database.
- If the Message Relay crash, you might end up sending more than one time the same message. This implies that message consumers should be idempotent, so the processing of a same message more than one time, doesn't has an effect the second time it's processed with the same input. Read more about idempotent in this Stackoverflow question.
Implementation example
Until now we've been basically copying and pasting from Pattern: Transactional outbox, shameless copy and paste, that's right. As almost all my articles, the purpose is to actually get my hands dirty and really learn about something I don't have
a clue. In order to grasp this pattern well, we need to code it, otherwise I'll forget it. Let's do it. I'll be using Golang for this.
Project structure
code/
├── config
│ └── config.go
├── controllers
│ ├── cat
│ │ └── controller.go
│ └── model
│ ├── cat_converter.go
│ └── cat.go
├── database
│ ├── database.go
│ └── migrations
│ ├── 20221005203113_events_table.sql
│ └── 20221005203128_cat_table.sql
├── docker-compose.yaml
├── go.mod
├── go.sum
├── main.go
├── Makefile
├── middlewares
│ └── cors.go
├── msgrelay
│ └── msgrelay.go
├── README.md
├── repositories
│ ├── cat
│ │ └── repository.go
│ ├── event
│ │ └── repository.go
│ └── model
│ ├── cat.go
│ └── event.go
└── routes
└── routes.go
Models
Let's declare our models in golang.
Cats
Here are some characteristics from cats that we want to reflect in our platform.
- Name
- Color
- Weight
- Intelligence (in a scale from 1-5, because only dogs are in a scale from 1-10, they are way more smart)
- Laziness (in a scale from 1-10)
- Curiosity (in a scale from 1-10)
- Sociability (in a scale from 1-10)
- Egoism (in a scale from 5-10)
- Miau Power (how loud he can miau, from a scale from 1-10)
- Attack power (in a scale from 1-10)
According to these requirements we have the following structs:
file: repository/model/cat.go
package model
type Cat struct {
ID string `gorm:"primaryKey"`
Name string `gorm:"column:name;size:100"`
Color string `gorm:"column:color;size:100"`
Weight float64 `gorm:"column:weight"`
Intelligence int `gorm:"column:intelligence"`
Laziness int `gorm:"column:laziness"`
Curiosity int `gorm:"column:curiosity"`
Sociability int `gorm:"column:sociability"`
Egoism int `gorm:"column:egoism"`
MiauPower int `gorm:"column:miau_power"`
Attack int `gorm:"column:attack"`
}
Just remember that this is our main resource, the want the we want to perform an update operation wit.
Outbox
In this table we will store the message/event information we want to send to the other services, it's up to you what you want here, but you should at
least reference which resource was changed. In our case we just want to store, the id of the resource, the type of resource, and the type of event that was triggered.
file: repository/model/event.go
package model
type Event struct {
ID string `gorm:"primaryKey"`
Type string `gorm:"column:type"`
ResourceID string `gorm:"column:resource_id"`
ResourceType string `gorm:"column:resource_type"`
Published bool `gorm:"column:published"`
}
Basic Create and Update operations
For our purpose the simplest scenario it's as follows, I create a cat, then update the cat, and this update operation should trigger the event. So we will
only be creating these two operations, feel free to create the other part of the CRUD, I'm lazy.
file: repository/cat/repository.go
package cat
import (
"context"
"github.com/Gealber/outbox/repositories/model"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type repository struct {
db *gorm.DB
}
func New(db *gorm.DB) *repository {
return &repository{db: db}
}
// Create creates a new cat :).
func (r *repository) Create(ctx context.Context, cat model.Cat) (*model.Cat, error) {
if err := r.db.Create(&cat).Error; err != nil {
return nil, err
}
return &cat, nil
}
// Update perform update operation to specified cat :).
func (r *repository) Update(ctx context.Context, id string, cat model.Cat) (*model.Cat, error) {
err := r.db.Transaction(func(tx *gorm.DB) error {
// update record in cats table.
result := tx.Model(&model.Cat{}).Clauses(clause.Returning{}).
Where("id = ?", id).
Updates(&cat)
if result.RowsAffected < 1 {
return gorm.ErrRecordNotFound
}
// create event to store.
event := model.Event{
Type: "update",
ResourceType: "cat",
ResourceID: id,
}
// write event in events table.
if err := tx.Model(&model.Event{}).Create(&event).Error; err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return &cat, nil
}
To be continued...
Now, one of the parts in this pattern, and an important one, is the Message Relay. Let's discuss that in other article, I'm tired of typing :), lazy like a cat.
Code
Part of the code can be found in this repository, I say part because the message relay use the Polling publisher pattern, which is not scalable. I'll discuss that other time.
Top comments (0)