DEV Community

Cover image for Kafka consumer rebalancing - impact on Kafka Streams consumers performances
Pierre Guimon
Pierre Guimon

Posted on • Updated on

Kafka consumer rebalancing - impact on Kafka Streams consumers performances

Introduction

Kafka consumer rebalancing is part of the lifecycle of any Kafka consumer group.

We will focus on defining what is Kafka consumer rebalancing first.

Then, we will see how it can impact the performances of Kafka Streams consumers on real applications and how we can mitigate it.

What is Kafka consumer rebalancing ?

Kafka consumers use the concept of consumer groups to allow a pool of processes to divide the work of consuming and processing records.

Each member of a consumer group, consumes from a unique set of Kafka topic's partitions.

Multiple consumer groups subscribed to a topic

Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as rebalancing the group.

Group rebalance requires two actors:

  • On server side: each group is managed by only one Kafka broker that we call a group coordinator.
  • On client side: one of the consumers is designed as a consumer leader/group leader and will compute the assignation using the implementation of the interface:
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
Enter fullscreen mode Exit fullscreen mode

defined through the consumers configuration:

partition.assignment.strategy
Enter fullscreen mode Exit fullscreen mode

You can find the existing partitions assignment strategies in Kafka consumer config properties.

Prior to Kafka Streams 2.4 version, the default consumer assignors was RangeAssignor or RoundRobinAssignor.

When a new consumer instance is deployed, the other members of the same group are notified by the group coordinator to release their resources (commit offsets and release the assigned partitions) using the rebalance protocol, and recalculate the allocation of resources (topic's partitions) between all group members (old ones plus the new one in this case).
The mechanism is the same when the group coordinator detects the loss of one of the members (e.g. : an instance crash => no heart-beats received for session.timeout.ms, a leave group signal when a member exceeds the poll.intervall.ms when processing records...)

Below is an example of rebalance when a third consumer instance (customer 3) joins a consumer group.

Kafka default rebalancing

The challenging step with this rebalancing is what is called the
Synchronization Barrier, at this step all resources (topic's partitions) must be released. This is done by each instance before sending the JoingGroup Request.

So until SyncResponse is received, no data will be processed by the consumers, and, as a result, processing of events from a topic happens with some delay. This is what we call a stop-the-world operation.

This is the most secure way to assign partitions. Some business cases could tolerate rebalancing like offline/asynchronous events processing, but you can imagine that it doesn't fly in some real-time event processing.

Now let's imagine that we are running a Kafka Streams application on a Kubernetes cluster. There are several use cases where rebalancing would occur:

  • Kubernetes node eviction: a node is being evicted from the Kubernetes cluster, any instances of our application running on this particular node would need to be restarted on another node of the cluster. This would induce JoinGroup requests from the newly deployed instances of the application, ultimately resulting in rebalancing.

  • Kubernetes pod eviction: an applicative pod is being evicted from a node of the Kubernetes cluster. Same as previously, this would result in rebalancing for the newly created applicative instance.

  • Application rolling upgrade: Similar to what might happen unexpectedly with a failure, an intentional application rolling upgrade could be triggered for a software update for instance.

  • Application scaling up/down: In case the Kafka Streams application requires to scale up or down, rebalance would occur to accommodate with the new consumers or removed consumers.

Are they any alternative ways to rebalance to not suffer from the stop-the-world effect ?

Incremental cooperative rebalancing

Amongst others, these reasons, motivated the need for a more robust rebalancing protocol.
With Kafka 2.4 version, the incremental cooperative rebalancing protocol was introduced, and the main aim is to not stop-the-world when rebalancing.
It uses the StickyAssignor mechanism: it allows to conserve as much as possible the same partitions processed by each consumer instance when rebalancing.
This protocol allows to release just the topic's partitions that will be processed by another consumer instance and not the others using two rebalance phases (or two consecutive rebalances):

  • The aim of the first rebalance is to revoke the partitions that will transit from an instance to another. The consumer leader computes the new assignment and sends to each one, only it's current partitions minus those to revoke.
  • On the second rebalance, consumers will send a second JoinGroup request to assign the revoked partitions (which are unassigned partitions).

Below is an example of incremental cooperative rebalancing when a third consumer instance (customer 3) joins a consumer group.

Kafka incremental cooperative rebalancing

More details around the motivations for implementing the incremental cooperative rebalancing are available on this Kafka confluence page. You can find the implementation details on this other Kafka confluence page.

Impact of rebalancing on performances

Now that we have described what is the Kafka consumer rebalancing, let's see how it impacts performances on a Kafka Streams based platform.

Test protocol

Let's consider the following simple example:

Kafka Streams microservices

We have a first microservice with 3 applicative instances, that is deployed on a Kubernetes cluster. This service consumes events from a single topic #1 using Kafka Streams 3.1 version. This topic contains 48 partitions and there are 3600 events per second published on it.
On each of the 3 instances we use the Kafka Streams configuration property:

num.stream.threads = 16
Enter fullscreen mode Exit fullscreen mode

Hence, on a single app instance we process events from 16 partitions with 16 Kafka Streams threads.
This microservice is only enriching the records it processes with additional information. Hence the number of events stays the same in the second topic.
The processing of a single event takes around 1 milliseconds and the stream is stateless.

We plug a second microservice with 4 applicative instances. Each of them consuming from a single topic #2 of 64 partitions using Kafka Streams 3.1 version. We use the same Kafka Streams configuration property, and, as a result, each instance processes events from 16 partitions using 16 Kafka Streams threads. It receives as well 3600 events per second from the processing of the first microservice.
The processing of a single event takes around 3.5 milliseconds and the stream is stateless.

We are using Kafka Streams 3.1 version and by default the incremental cooperative rebalancing is used.

We will be performing a rolling update of the first microservice, simulating a software update, to see the impact on the platform while injecting 3600 events per second on topic #1 for 30 minutes.

Monitoring

The microservices are monitored via Prometheus solution and they are exposing Kafka Streams metrics amongst other metrics.
Here are the metrics used to monitor the impact of the Kafka consumer rebalancing mechanism:

  • kafka_stream_processor_node_record_e2e_latency_avg: The average end-to-end latency of a record, measured by comparing the record timestamp when it was first created with the system time when it has been fully processed by the node. The Kafka rebalancing should impact this metric as some partitions being rebalanced won't get consumed anymore during rebalancing, and, as a result, the average end-to-end latency of a record should increase.
  • kafka_consumer_coordinator_rebalance_latency_avg: The average time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded.
  • kafka_consumer_fetch_manager_records_lag_avg: The average records lag. Lag is the number of records available on the broker but not yet consumed. An increasing value over time is the best indication that the consumer group is not keeping up with the producers. The Kafka rebalancing should impact this metric as some partitions being rebalanced won't get consumed anymore during rebalancing, and, as a result, the lag should increase.
  • kafka_consumer_fetch_manager_records_lag_max: The max records lag.

Performance test

Since the Kafka consumer incremental cooperative rebalancing is activated by default for our version of Kafka Streams, we wanted to force the Kafka consumer eager rebalancing mechanism to see the advertised positive impact of the incremental cooperative rebalancing.
The Kafka consumer eager rebalancing is the former rebalancing protocol that was in place before the Kafka consumer incremental cooperative rebalancing was introduced and set as default.

To do so we simply need to add the following Kafka Streams configuration property to the deployment configuration of our first microservice:

upgrade_from: 2.3
Enter fullscreen mode Exit fullscreen mode

This ensures that the Kafka cooperative rebalancing is not activated anymore since it didn't exist in this version of Kafka.

In the following graphs we compare the impact of Kafka incremental cooperative rebalancing to the Kafka eager rebalancing.

  • Kafka Streams requests/sec processed per pod instance on the microservice #1 (1200 requests per pod x 3 = 3600 events/sec):

Kafka Streams requests/sec processing

  • Kafka Streams average/max end-to-end latency time of a record seen on the microservice #2:

Kafka Streams avg-end-to end latency

Kafka Streams max end-to-end latency

The average/max end-to-end latency time of a record is the same when we compare the eager rebalancing to the cooperative rebalancing and is high, ~40sec. To give you more context, the average end-to-end latency seen on microservice #2 when no rebalancing is occurring is around 50 milliseconds!

  • Kafka Streams total max lag per topic (in yellow the topic #1 corresponding to the topic from which the microservice #1 is consuming):

Kafka Streams total max lag per topic

We do see more lag (~100k elements) with the eager rebalancing compared to the cooperative rebalancing (~40k elements). Without rebalancing, not a single lag has been witnessed.

  • Kafka group rebalance average time on the microservice #1:

Kafka group rebalance average time

The rebalance average time is the same (~40sec) when we compare the eager rebalancing to the cooperative rebalancing.

Using the Kafka eager rebalancing, with the stop-the-world effect, while rebalancing is in progress we can see that the max lag goes higher than with the cooperative rebalancing, which is expected since in incremental cooperative mode, some consumers are still able to consume.

Overall the lag is scattered over time when using the cooperative rebalancing, and the average end-to-end latency time of a record is high in both cases.

What we can retain from this test is that the cooperative rebalancing allows for some consumers to sill consume during a rebalancing but overall the average end-to-end latency time of a record is pretty high in both cases.

We have seen that some consumers get rebalanced several times with the cooperative rebalancing during the rolling update. It puts pressure on some partitions which are not anymore consumed for an important time and this induces high average end-to-end latency time of a record on those partitions, whereas on some other partitions which didn't get rebalanced that often, the average end-to-end latency time of a record is rather low.

When the consumers, which consume from partitions with high number of tanked messages restart after the rebalance, they have to dequeue a lot more items than for the other partitions, hence the message consumption on those consumers is increasing for a short period of time until the lag is consumed.

In the end, the incremental cooperative rebalancing reduces the overall max lag consumption on the topic #1 but the average end-to-end latency time of a record is not improved and high in both cases.
You can easily imagine that the Kafka rebalancing can have an important impact if at each deployment rollout we increase the average end-to-end latency time of a record from 50ms to 40sec! In real-time use cases with strong end-to-end latency time of a record requirements, this doesn't fly.

Mitigating the impact of Kafka consumer rebalancing

Let's see how we can decrease the impact of Kafka rebalancing through various tips and how they have impacted our platform.

Keep up-to-date with Kafka Streams latest versions

With the first versions of Apache Kafka client (2.4) using the CooperativeStickyAssignor (incremental cooperative rebalancing), there was no huge difference observed in terms of performances, but this was for preparing to a multitude of changes that have been delivered in next versions (2.6) and that will be delivered in the future. That is why it's essential to upgrade frequently to the latest release.

Decrease Consumer Session Expiration

There is a Kafka Streams configuration property named:

session.timeout.ms
Enter fullscreen mode Exit fullscreen mode

It corresponds to the timeout used to detect client failures when using Kafka's group management facility.

The client sends periodic heartbeats to indicate its liveness to the broker.

If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.

Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

In Kafka Streams 3.1 version that we are using the default value is 45sec.

If an application instance gets unavailable, rebalance will happen only after the default 45sec, and eventually induce a lag in events consumption.

In conjunction with this property there is another Kafka Streams configuration property named that requires an update:

heartbeat.interval.ms
Enter fullscreen mode Exit fullscreen mode

It corresponds to the expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.

Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.

The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value.

It can be adjusted even lower to control the expected time for normal rebalances.

The default value of the heartbeat is 3sec in Kafka Streams 3.1 version that we are using.

We need to be careful with these settings, as it increases the probability of rebalancing occurrence on a daily basis, and consumers might hang in long rebalances, depending on network quality and stability.

For our testing we have set the following values:

session.timeout.ms=6000
heartbeat.interval.ms=1500
Enter fullscreen mode Exit fullscreen mode

Sending leave group request

By default, Kafka Streams doesn't send consumer leave group request on application graceful shutdown, and, as a result, messages from some partitions (that were assigned to terminating applicative instance) will not be processed until session by this consumer will expire (with duration session.timeout.ms), and only after expiration, new rebalance will be triggered.

By default in Kafka Streams 3.1, we have session.timeout.ms=45000, so it means during a single applicative instance restart, messages by some partitions will be processed at least within 45 seconds, and it's painful for real-time requirements.

You can find more details about a discussion on this property in this Kafka Jira ticket.

The default value is false in Kafka Streams configuration.

For our testing we have set the following values:

internal.leave.group.on.close=true
Enter fullscreen mode Exit fullscreen mode

Tweak deployment rollout strategy

Our application is deployed on a Kubernetes cluster and as such we can easily control the number of applicative instances which are created during a rolling update.

One can use the following Kubernetes deployment properties to achieve this:

.spec.strategy.rollingUpdate.maxSurge (absolute/percentage, default: 25%)
.spec.strategy.rollingUpdate.maxUnavailable (absolute/percentage, default: 25%)
Enter fullscreen mode Exit fullscreen mode

The more instances we rollout at the same time, the more partitions will need to get reassigned through Kafka rebalancing, eventually leading to high end-to-end latency time of a record, and a higher max lag per topic.

Decreasing the number of instances which are started at the same time can mitigate the impact of Kafka rebalancing.

For our testing we have set the following values:

.spec.strategy.rollingUpdate.maxSurge=1
.spec.strategy.rollingUpdate.maxUnavailable=0
Enter fullscreen mode Exit fullscreen mode

In that case, a single applicative instance will be started at a given time. The deployment process will be longer for sure, but overall we expect to have more stable performances on the platform, which is what we are looking for real-time use cases.

Results

We have applied the above recommendations and we have ran the same test protocol than before except that we have launched 4 rolling updates of microservice #1 in 30mins to compute bunch of data. Here are the results:

  • Kafka Streams requests/sec processed per pod instance on the microservice #1 (1200 requests per pod x 3 = 3600 events/sec):

Kafka Streams requests/sec processing

We do not see anymore the impact of Kafka rebalancing on the number of events processed by the microservice #1.

  • Kafka Streams average/max end-to-end latency time of a record seen on the microservice #2:

Kafka Streams avg-end-to end latency

The average end-to-end latency time of a record is far less impacted than before by the Kafka rebalancing. We went down from 40sec to ~750ms in average during a Kafka rebalancing.

  • Kafka Streams total max lag per topic (topic #1 corresponding to the topic from which the microservice #1 is consuming):

Kafka Streams total max lag per topic

We have far less lag than before the optimizations. We went down from ~40k elements with incremental cooperative rebalancing to ~150 elements.

  • Kafka group rebalance average time:

Kafka group rebalance average time

The rebalance average time went down from 40sec to ~1sec with the optimizations.

Here is a table summing-up the results of the optimizations on the platform:

Without optimizations With optimizations
microservice #1 - avg group rebalance time 40sec 1sec
topic #1 - max lag 40.000 events 150 events
microservice #2 - avg end-to-end latency time of a record 40sec 700ms

Kafka consumer rebalancing mechanism has a non-neglectable impact on the performances of real-time applications. It can be mitigated by setting some Kafka consumers properties but it is still present.

References

  • LinkedIn Engineering team talk on Kafka rebalancing video: Consumer Group Internals: Rebalancing, Rebalancing....
  • Juan Rao quick talk on Kafka consumer group video: Apache Kafka Consumers and Consumer Group Protocol
  • Gwen Shapira talk on Kafka rebalancing video: The Magical Rebalance Protocol of Apache Kafka

Oldest comments (0)