DEV Community

William Ahrons
William Ahrons

Posted on

A few thoughts about Kafka

In this post, I want to share a few pitfalls that I faced while working with Kafka.

Rebalacing

Kafka has a timeout configuration, that is if you are unable to process the set of messages that the instance was given it will rebalance. So if the instance is for some reason slow, Kafka will give the same partition to another instance/consumer however the previous partition is still processing the message, in this scenario, both partitions will process the same message, and this process will continue until it's in all partitions.

How to solve, first try to make your process more performant, if this isn't possible try to increase the timeout, if this doesn't solve the problem there is this class in Kafka API that you can commit the current offset to make the other instance to not pick up the previous message.

Fanout how to do it?

Fanout is a common pattern when we are talking about messaging driven apps. But with Kafka there are only two ways that I know of that we can have this in our system.

  • First is to have multiple consumer groups in place, but in this case, both groups will receive the same set of messages.

  • Second is to change the messageKey hash to be different for each message making Kafka send each message to a different partition, in this way we don't know how many messages Kafka will send to each partition making unpredictable if it will be a round-robin or not, since the hash function can vary depending on the data that you are going to send to Kafka to create the hash.

How to count the messages in the topic

In Kafka we have this concept called offset of the partition and the committed offset, we don't have the number of messages in the topic. In the end, you only have the LAG between the committed offset against the partition offset.

$ LAG = (partitionOffset - commitedOffset) by partition by topic

Using Kafka API in Java:


public List<OffsetAgg> getLAG() {
ListConsumerGroupOffsetsResult listOffsets = adminClient.listConsumerGroupOffsets("consumer-group");
        return listOffsets.partitionsToOffsetAndMetadata()
                .get()
                .entrySet()
                .stream()
                .map((entry) -> {
                    return OffsetAgg.builder()
                            .partition(entry.getKey().partition())
                            .topicName(entry.getKey().topic())
                            .committedOffset(entry.getValue().offset())
                            .build();
                }).collect(groupingBy(OffsetAgg::getTopicName))
                .entrySet()
                .stream()
                .flatMap(entry -> getPartitionsOffset(env, entry.getValue()).stream())
                .collect(Collectors.toList());

    }

    public List<OffsetAgg> getPartitionsOffset(Env env, List<OffsetAgg> offsetAggs) {
        KafkaConsumer consumer = consumerMap.get(env);
        List<TopicPartition> topicPartitions = offsetAggs
                .stream()
                .map(agg -> buildTopicPartition(agg))
                .collect(Collectors.toList());
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

        return offsetAggs
                .stream()
                .map(agg -> agg.toBuilder().partitionOffset(endOffsets.get(buildTopicPartition(agg))).build())
                .collect(Collectors.toList());
    }

    private TopicPartition buildTopicPartition(OffsetAgg agg) {
        return new TopicPartition(agg.getTopicName(), agg.getPartition());
    }

    @AllArgsConstructor
    @Getter
    @Builder(toBuilder = true)
    public static class OffsetAgg {
        private Long partitionOffset;
        private Long committedOffset;
        private Integer partition;
        private String topicName;
    }

I made this post to help developers to understand a few things while using Kafka.
Rebalancing is nice, but you will need to change the default timestamp for consuming the pre-fetch messages to address your needs or add a configuration in Kafka to commit the offset before given the partition to another instance.
Fanout is a common pattern to implement but be aware that in Kafka is not that easy to do it, in order to do that you will need to leverage the ordering mechanism or add consumer groups in the partitions.
I'm used to RabbitMq so having the count of the messages is something that I usually look into to have it, so in Kafka since it has partition offset and consumer offset is a little bit tricky to have the actual number of how many messages are in the partition, in the code, I'm able to understand the offset and derive from that the number of messages in Kafka and in each consumer.

Top comments (0)