DEV Community

Cover image for How Kafka applies zombie fencing
Oleg Potapov
Oleg Potapov

Posted on

How Kafka applies zombie fencing

Distributed systems are complicated, we know it. And among thousands of other problems, there is a problem of zombie processes. So, today we’ll discuss when it occurs and how to solve it using the Kafka example.

The Problem

Zombie process is a term that came from the operating systems field [1]. In Unix-like systems, a process is called a “zombie” when it has finished its execution but still stays in the process table. But in distributed systems “zombie” means exactly the opposite. A zombie process is a process that was considered “dead” by other components of the system but hasn’t finished its execution. The most common reason behind it is a temporary network issue, which made the process unavailable for some period of time. After the issue is resolved the process returns online, but during the time it was unavailable the system already launched (or elected) its replacement.

One example is the replication system with one primary node that allows both reads and writes and several secondary read-only nodes. When the primary node loses its connection to other nodes, one of the replicas becomes a new leader and performs its functions. At the same time, the old leader may still be available for the application and can still process write requests. But these requests are not replicated to other nodes since the connection is lost and the zombie leader doesn’t receive new data from the other nodes as well. When connection is restored, data on the nodes is derived and it’s hard for them to synchronize without data loss. This problem is also called split-brain [2].

Split-brain may be dangerous for the whole system's consistency since both “brains” are active and can commit conflicting changes at the same time.

split-brain problem

Possible solutions

Scenario described above is not the only possibility. We can imagine another one. Say, we have two data-centers and five database replicas - two in the first data-center (and one of them is a current leader) and three in the second. Suddenly the network connection between the data-centers is lost, while all the nodes are still working and can be accessed from other places. So what do we do? Should the three replicas in the second data-center elect a new leader? Keeping in mind that sometimes it’s very hard to determine whether the node is down or just temporarily unavailable, we can imagine plenty of other possible complicated scenarios.

That’s why different software products have different solutions for this problem, depending on the purpose of the software and its main function (storing or publishing data).
Still, there are three main principles most of these solutions are based on:

  1. Third-party actor (or witness) - an independent application outside of the cluster that can check the availability of all the nodes. Several types of applications can be used for this purpose: load-balancer, Zookeeper, or some other dedicated cluster software. And there are a variety of scenarios of how exactly this application protects the cluster from the split-brain problem. It can check the availability of nodes with heartbeats or register the nodes inside the application, allowing only one primary node to be registered.
  2. Consensus - the decision about the current leader is based on the nodes’ vote. To promote one of the replicas to be the primary one, it should get a majority of votes, or quorum[3]. This approach is used in MongoDB replica sets. Another example is Hazelcast, which uses the quorum approach for write operations to protect itself from split-brain [4]. When an operation can’t be performed on the sufficient number of cluster members, it raises an exception.
  3. Generation numbers - there is a generation number available across the cluster, which monotonically increases every time the leader is changed. All the nodes accept only actions performed using the current value of this number. When the old leader is disconnected from other nodes, it will keep the old generation number and won’t be able to apply changes anymore.

These are the three basic principles commonly used, either separately or in combination.

What does Kafka do?

Apache Kafka is a message broker that among other features offers an exactly-once messaging guarantee. I described how it works in general in one of my previous articles. Another guarantee that Kafka provides is a message order guarantee within one partition. These two features put additional demands on message producers. Each producer should be unique inside the cluster and use unique sequential message ids[6]. Thus, it’s critical for the broker to detect “zombie” producers and reject the messages they try to send.

Each transactional producer in Kafka has its own transactionalID which is registered in the Kafka cluster with the first operation after the producer starts. Also, there is an epoch number associated with the transactionalID stored as metadata in the broker. When a producer registers the existing transactionalID, the broker assumes that it’s a new instance of the producer and increases the epoch number. The new epoch number is included in the transaction and if it’s lower than the newly generated epoch number, then the Transaction Coordinator rejects this transaction.

Kafka zombie producer fencing
Let’s return to the issue description mentioned above and see how Kafka handles it. When the first producer’s instance temporarily fails and another instance appears, the new one invokes initTransactions method, which registers the same transactionalID and receives the new epoch number. This number is included in transactions and checked by the Transaction Coordinator. This check will be successful for the new producer, but when the old instance is back online and tries to begin the transaction, it’s rejected by the coordinator since it contains the old epoch number. In this case, the producer receives a ProducerFencedException and should finish its execution.

Another thing that deserves a separate mention is unfinished transactions. When the new producer’s instance registers itself in the broker, it can’t start until all the transactions for the previous instance are completed. To do that Transaction Coordinator finds all the transactions with the associated transactionID which have no COMMITTED message in the transaction log. (I briefly described how Transaction Coordinator aborts and commits a transaction in the article about Kafka exactly-once semantics[6]) If there is a PREPARE_COMMIT message written to the transaction log, then it means that commitment process is already started and the coordinator completes this process. Otherwise the transaction is aborted.

Conclusion

Split-brain issue might be a serious challenge in a distributed system, mainly because once it occurred, it is too complicated to fix the failures it had created. That’s why it’s better to think ahead. And if you can’t eliminate this possibility, you should at least have an emergency plan.

Luckily most of the modern software products that were designed to be used in distributed systems take responsibility to handle these errors. Apache Kafka is one of them. Its architecture already contains a Transaction Coordinator module, which runs inside every broker and behaves as a third-party actor that internally registers producers. This makes fencing “zombie” producers quite an easy operation: all you should do is to assign an application-unique transactionalID to the producer, and everything else will be handled by the broker. Thus, using Kafka, you can be sure you are protected from zombies, but never too sure as this problem may appear in other places of your application!

Links

  1. https://en.wikipedia.org/wiki/Zombie_process
  2. https://en.wikipedia.org/wiki/Split-brain_(computing)
  3. https://en.wikipedia.org/wiki/Quorum_(distributed_computing)
  4. https://docs.hazelcast.com/imdg/4.2/network-partitioning/split-brain-protection
  5. https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/common/errors/ProducerFencedException.html
  6. https://oleg0potapov.medium.com/how-kafka-achieves-exactly-once-semantics-57fdb7ad2e3f
  7. https://developer.confluent.io/tutorials/message-ordering/kafka.html
  8. https://www.confluent.io/blog/transactions-apache-kafka/

Top comments (0)