DEV Community

Cover image for Concurrency in Spring's StreamListener and Kafka

Concurrency in Spring's StreamListener and Kafka

sgmoratilla profile image Sergio Garcia Moratilla ・2 min read

TL;DR: go to Use Configuration

Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it's so damn easy that I don't want anyone to spend more than a few minutes.

We are using the Spring Cloud Stream layer to configure our Kafka consumers.

For example, a configuration for a processor named 'reservations-input' connected to a Kafka topic 'reservations-topic' would be similar to this:
      content-type: application/json
      destination: reservations-topic
      group: consumer-service-group

And your class to start processing those events:

public class MessagingConfiguration {
    public interface ReservationTopic {

        String INPUT = "reservations-channel";

        SubscribableChannel input();

public class ReservationProcessor {
    public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
        // your stuff

Easy peasy. Only problem here is concurrency.

If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency.
Each partition have 1 single consumer.

I don't know whether (or where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring's only generates 1-threaded processor.

Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see

Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.

The only hint I found in the documentation or stackoverflow but to instance a bean of type ConcurrentKafkaListenerContainerFactory

    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        @Nonnull ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

        return factory;

I am not very prone to instance my own beans to configure things that seems too obvious. It is easy to overwrite some Spring default values that I am already expecting to use, it is more code to maintain...

There has to be a way through configuration.

Option 2: use configuration

Getting back to configuration, what we write under ends in the configuration of Kafka. So that I tried to configure the property concurrency. That is:
      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group

Starting our application, we see that we have 3 binders.

    December 17th 2019, 14:22:57.274    2019-12-17 13:22:57.274  INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-1]
    December 17th 2019, 14:22:57.259    2019-12-17 13:22:57.259  INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-2]
    December 17th 2019, 14:22:57.256    2019-12-17 13:22:57.256  INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-3]

Discussion (0)

Editor guide