Previously, when discussing message queues, we mentioned that when selecting a message queue, we should consider its delivery guarantee. Depending on the needs, there are different requirements for the delivery guarantee. The more critical the application, the higher level of guarantee is required.
There are three different levels of delivery guarantees, ranging from weak to strong as follows.
- At most once
- At least once
- Exactly once
When we define a delivery guarantee, in fact, we need to understand the scope of the guarantee. In an end-to-end delivery guarantee, there are three different paths that work together to define a complete end-to-end delivery guarantee.
These three paths are as follows.
- Producer Perspective
- Consumer Perspective
- Sink Perspective
An end-to-end delivery guarantee is actually determined by the weakest of these three paths. For example, if the producer has an exactly-once guarantee, but the consumer can only achieve at-least once, and the sink has at-least once too, then end-to-end is at-least once guaranteed.
In this article, we will focus on how to achieve an exactly-once guarantee. Therefore, it is necessary to understand the meaning of these three paths and how to handle failures.
Producer Perspective
From the producer's point of view, we always want any message that is sent to be delivered.
Most message queues are able to tell the producer whether the message was sent successfully or not, and when the message is sent successfully, the producer will receive a response. We can be sure that if we receive a response, it was successful, and if we don't receive it, it's basically a failure.
However, if we just lose the response but send it successfully, we have a problem.
The diagram above shows a common error process, so how do we avoid sending twice?
There are several approaches.
The first approach is for the producer to maintain a unique constraint on the message, e.g. (producer_id, msg_id)
. When a send failure occurs, the producer goes to the broker and queries the broker's internal storage to determine whether the send failed or the response was lost.
Nevertheless, not every message queue supports this approach. If the message disappears as soon as it is taken away by the consumer, such as RabbitMQ, then this approach will not work. Conversely, systems like Kafka, which store messages as logs, can provide producers with queries.
In the second approach, the producer does not identify if a response is lost, instead resending the message anyway, but leaving a GUID (globally unique ID) on the message. On the other hand, the consumer receives the message and has to determine if the GUID has been processed, or simply ignore it if it has been processed.
If using Kafka, there is another approach to guarantee exactly-once, that is, through message transactions.
Just like a relational database, declare START TRANSACTION
before sending a message, and declare COMMIT
after the message is sent. If any of the intermediate messages fail, then the producer does not COMMIT
but simply aborts the entire transaction and resends it. In this way, a failed resend does not result in a duplicate send.
Consumer Perspective
This is the scenario that all message queues focus on. The delivery guarantee provided by the message queue is only a part of the overall end-to-end delivery guarantee from the consumer perspective.
Most message queues can only provide at-least once guarantee. Once the consumer has correctly processed the message, an acknowledgment is sent to the message queue, and the message is removed only when the message queue receives the acknowledgment.
And, the flow of the problem is as follows.
When Consumer 1 handles Message A and an acknowledgment is lost, then Consumer 2 can continue to get the same Message A and handle it, resulting in handling twice.
Basically, this is a common problem for messages queues that are supported at-least once.
Idempotent Updates
In general, developers try to make messages as idempotent as possible so that they do not cause problems no matter how many times a message is handled.
As mentioned in the previous section, putting a GUID inside each message is an obvious solution. Consumers store handled messages in a durable storage, and each consumer must verify whether a message has been handled before handling it.
In addition, consumers must avoid duplicate updates when updating the storage by using transactions.
Let's take MySQL as an example.
START TRANSACTION;
$RET = UPDATE event SET handled = true WHERE msg_id = 123 AND handled = false;
if $RET == 1:
Do whatever you want;
COMMIT;
Please notice in the first line after the transaction, in addition to msg_id
in the WHERE
clause we must also add handled = false
, this is one of the ways to avoid MySQL having racing conditions. If we don't add this check condition, it is still possible to process the same message twice under race conditions.
There is a detailed explanation of how MySQL handles race conditions in my previous article, if you want to know the details you can refer to the following.
https://medium.com/interviewnoodle/how-to-avoid-the-race-condition-and-the-negative-value-3f397b2b08e4
Nevertheless, it is not easy to make the message idempotent. In addition to the consumer, it often requires the cooperation of the producer, which adds to the coupling and complexity.
Event Sourcing
Another approach is to use event sourcing. To implement event sourcing, the message queue must support querying message logs, such as Kafka.
Event sourcing means that the current state of the consumer is generated by replaying all events, so any event handling failure can be recovered by simply replaying the event from the beginning.
However, this approach creates two problems.
- The efficiency of replaying from scratch is not good.
- It is difficult to get a list of several current states.
To solve the first problem, a checkpoint mechanism is usually used. Take snapshots of the states and record offsets at regular time intervals or for each batch of messages handled.
When a problem occurs, just restore the last snapshot and replay the last offset to the latest data. Through the checkpoint mechanism, we can greatly improve the replay efficiency.
The second problem is usually solved by using CQRS, so that a variety of different read requirements have a snapshot available for reference. For more information on how to evolve the CQRS architecture, please refer to my previous article.
https://medium.com/interviewnoodle/shift-from-monolith-to-cqrs-a34bab75617e
To sum up, in order to achieve an exactly-once guarantee by using event sourcing, the consumer must be able to record the handled states and offsets; when each error occurs, the consumer discards the current result and tries to replay it from the previous state snapshot to get the correct result.
Sink Perspective
Sink means downstream of the message. In fact, the sink perspective is a special case of the customer perspective.
Suppose the sink is a database, the consumer itself can already achieve exactly-once guarantee through the event sourcing.
But this is not enough, because in the process of recovering the consumer will propagate the event to the next again, resulting in downstream receiving duplicate results of the same event.
In the above process, the consumer itself can guarantee exactly-once by restoring the state, but the downstream database has been written twice, which is still a problem.
Honestly, there is no silver bullet in sink that can completely achieve exactly-once guarantee without side effects.
Let's check Solution 1, we only output the accumulated results after the consumer has successfully created a checkpoint.
Since consumers are guaranteed exactly-once through the checkpoint mechanism, we only output the results when the checkpoint is created, thus ensuring consistency between consumer and sink. It is important to make the checkpoint creation and output be transactional.
However, Solution 1 has an obvious drawback. The message handling loses its real-time nature and generates spikes in the database. Because the checkpoints are created in batches, not for each message, i.e., there are no current results in the database. In addition, the output process is also batch, which results in a large number of writes to the database at regular intervals.
So, let's see Solution 2, which makes the database update idempotent.
It is straightforward to simply make each database update have a primary key and overwrite the rows with the same primary key.
However, Solution 2 also has drawbacks. Firstly, it is not easy to define the primary key for each update. Secondly, when a consumer fails to handle and starts a replay, the saved results may time travel and be reverted back to the past then updated again.
This inconsistency depends on how often the checkpoints are created and how efficiently they are replayed, which can take up to a couple of minutes.
Both Solution 1 and Solution 2 have their potential risks, but if I had to choose, I would prefer Solution 2 more.
Conclusion
In this article, we take a closer look at the semantics of exactly-once and understand how end-to-end exactly-once can be implemented.
Starting with the message producer at the beginning, the message is sent and enters the message queue, then is read and handled by the consumer, and finally the result is output downstream.
I have to say exactly-once guarantee is a very difficult holy grail to achieve, and it has to be done correctly at every stage of the message flow in order to fully achieve end-to-end exactly-once guarantee.
I believe there must be some original designs besides listed in this article that I have never noticed, if you have your own implementation, please feel free to share with me.
Top comments (0)