DEV Community

yedf2
yedf2

Posted on

How to Implement Saga Pattern in Microservices

The Microservice architecture allows us to maintain applications services and their databases separately. As a sequence, it is common to have data changes that need to propagate across multiple services. Database transactions can not go across multiple services, so the data integrity becomes a challenge.

SAGA

Saga is a distributed transaction pattern mentioned in this paper SAGAS. The core idea is to split long transactions into multiple short local transactions, which are coordinated by the Saga transaction coordinator, so that if each local transaction completes successfully then it completes normally, and if anyone step fails then the compensating operations are invoked one at a time in reverse order.

This article will present a complete SAGA example to give the reader an accurate understanding of SAGA transactions

Business Scenario

An inter-bank transfer is a typical distributed transaction scenario, where A needs to transfer money across a bank to B. Both the withdraw and the deposit may succeed of fail, and it is required that the sum of balance of A and B should not change after the transfer finished, regardless of any errors that occur.

SAGA transaction

We will present you a detailed runable example base on dtm, a distributed transaction framework.

Suppose that you have finished your implementation of the business for transfer and rollback.

  • "/api/SagaBTransOut"
  • "/api/SagaBTransOutCom" compensation for TransOut
  • "/api/SagaBTransIn"
  • "/api/SagaBTransInCom" compensation for TransIn

The following code will orchestrate these operations to a Sagas distributed transaction. The transaction will finished when both TransIn and TransOut succeed, or both rolled back. In both cases, the sum of balance of A and B remains the same.

    req := &gin.H{"amount": 30} // load of microservice
    // DtmServer is the address of the DTM service
    saga := dtmcli.NewSaga(DtmServer, dtmcli.MustGenGid(DtmServer)).
        // Add a child transaction of TransOut with url: qsBusi+"/TransOut" for the forward operation and url: qsBusi+"/TransOutCom" for the reverse operation
        Add(qsBusi+"/SagaBTransOut", qsBusi+"/SagaBTransOutCom", req).
        // Add a subtransaction of TransIn with url: qsBusi+"/TransOut" for the forward action and url: qsBusi+"/TransInCom" for the reverse action
        Add(qsBusi+"/SagaBTransIn", qsBusi+"/SagaBTransInCom", req)
    // commit saga transaction, dtm will complete all subtransactions or rollback all subtransactions
    err := saga.Submit()
Enter fullscreen mode Exit fullscreen mode

A successful transaction timing diagram is as follows.

saga-normal.jpg

Core operations

The adjustment and compensation of user balance should be handled carefully. Here we dive into the detail of the adjustment. For the example of the bank transfer we are going to perform, we will do the TransOut and TransIn in the action operation and the opposite adjustment in the compensation operation.

First we create the account balance table.

CREATE TABLE dtm_busi.`user_account` (
  `id` int(11) AUTO_INCREMENT PRIMARY KEY,
  `user_id` int(11) not NULL UNIQUE ,
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `trading_balance` decimal(10,2) NOT NULL DEFAULT '0.00',
  `create_time` datetime DEFAULT now(),
  `update_time` datetime DEFAULT now()
);
Enter fullscreen mode Exit fullscreen mode

Then write the core business code to adjust the user's account balance

func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error {
    _, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?" , amount, uid)
    return err
}
Enter fullscreen mode Exit fullscreen mode

Then write the specific processing function for the action/compensation operation

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  barrier := MustBarrierFromGin(c)
  return barrier.Call(txGet(), func(tx *sql.Tx) error {
    return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, "")
  })
}))
app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  barrier := MustBarrierFromGin(c)
  return barrier.Call(txGet(), func(tx *sql.Tx) error {
    return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
  })
}))
app.POST(BusiAPI+"/SagaBTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  barrier := MustBarrierFromGin(c)
  return barrier.Call(txGet(), func(tx *sql.Tx) error {
    return SagaAdjustBalance(tx, TransOutUID, -reqFrom(c).Amount, "")
  })
}))
app.POST(BusiAPI+"/SagaBTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  barrier := MustBarrierFromGin(c)
  return barrier.Call(txGet(), func(tx *sql.Tx) error {
    return SagaAdjustBalance(tx, TransOutUID, reqFrom(c).Amount, "")
  })
}))
Enter fullscreen mode Exit fullscreen mode

The core logic of these processing functions is to adjust the balance, for which the role of barrier.Call will be explained in detail later

Run

Follow these steps to run a successful example.

  1. Run dtm which manage the distributed transactions
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
Enter fullscreen mode Exit fullscreen mode
  1. Run the example
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_barrier
Enter fullscreen mode Exit fullscreen mode

Handling network exceptions

Suppose a transaction committed to dtm has a transient fault when an operation is invoked. dtm will retry the incomplete operation, which requires the subtransactions of the global transaction to be idempotent. dtm framework pioneered the sub-transaction barrier technique, providing the BranchBarrier tool class to help users handle idempotency easily. It provides a function Call that guarantees that the operation inside this function will be commited at most once:

func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) error
Enter fullscreen mode Exit fullscreen mode

This BranchBarrier can automatically handle not only idempotency, but also null-compensation and hanging issues, see exceptions and solutions for details.

Handling rollbacks

What happens if the bank is preparing to transfer the amount to user B and finds that user B's account is abnormal and returns a failure? We update the handler function so that the transfer operation returns a failure

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  return dtmcli.ErrFailure
}))
Enter fullscreen mode Exit fullscreen mode

We give the timing diagram for the transaction failure interaction

saga-rollback

The action of TransIn branch did nothing and returned a failure. Will compensation of TransIn branch cause the reverse adjustment to go wrong?

Don't worry, the preceding sub-transaction barrier technique ensures that the TransIn failure is compensated as a null operation if error occurs before the commit, and is compensated to do opposite adjustment if error occurs after the commit

You can change the TransIn that returns an error after commit.

app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
  barrier := MustBarrierFromGin(c)
  barrier.Call(txGet(), func(tx *sql.Tx) error {
    return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, "")
  })
  return dtmcli.ErrFailure
}))
Enter fullscreen mode Exit fullscreen mode

The final result balance will still be right, see Exceptions and Solutions for details.

Summary

This article gives a complete SAGA transaction solution, a working SAGA, which you can use to solve your real problems with a few simple modifications to this example

A detail description of SAGA can be found here SAGA

You are welcomed to visit https://github.com/dtm-labs/dtm

Top comments (0)