DEV Community

Cover image for Kafka: consumindo registros com Spring
Fabio José for Kafka BR

Posted on • Edited on

Kafka: consumindo registros com Spring

O consumo de dados do Kafka, dependendo do caso de uso, requer alguns cuidados no commit. Fazê-lo de forma correta é importante para garantir que nada seja perdido.

Com Clientes Kafka, em especial a versão para Java, é muito simples e intuitivo, mas no Spring Kafka é necessária atenção com ajustes especiais.


O foco neste artigo é mostrar como consumir dados com Spring Kafka, que é uma abstração sobre os Clientes Kafka para Java. Fazendo isso com seguindo setup:

  • enable.auto.commit=false. Que requer commit manual
  • commit síncrono

Devido a semântica de entrega at-least-once, um registro poderá ser consumido uma ou n vezes, e este setup busca reduzir isso.


Um consumidor típico no Spring Kafka é escrito assim:



@Component
public class SpringKafkaListener {
  @KafkaListener(topics = "topico")
  public void consume(String valor) {
    // Processar valor do registro
  }
}


Enter fullscreen mode Exit fullscreen mode

E criado com as seguintes configurações, feitas no application.properties:



spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


Enter fullscreen mode Exit fullscreen mode

Felizmente o Spring Kafka não redefine valores, portanto a configuração padrão é mantida assim como definida na documentação oficial, porém, nela o commit é automático e assíncrono. Contudo, no Spring Kafka, todas as configurações necessárias para commit manual e síncrono não estão disponíveis através de propriedades no application.properties.

Resolvendo

Spring Kafka é uma abstração, logo o poll loop e commit são transparentes. E como pode-se ver no exemplo, um consumidor recebe apenas o registro e por padrão não tem acesso ao Consumer.

Primeiro é necessário revisar as configurações para desligar o commit automático.

Nova configuração:



# Nada de novo aqui
spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Desliga o commit automático no Cliente Kafka
spring.kafka.consumer.enable-auto-commit=false


Enter fullscreen mode Exit fullscreen mode

Spring tem sua própria notação para a maioria das configurações presentes no Kafka Consumer, que são traduzidas em tempo de execução para o nome correto.

Agora que o commit automático foi desligado, são necessários alguns ajustes programáticos feitos ao customizar as fábricas de objetos:

  • ackMode para MANUAL
  • syncCommits como true


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@EnableKafka
@Configuration
public class KafkaConfig {

  @Autowired
  KafkaProperties properties;

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
              properties.buildConsumerProperties());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
      kafkaListenerContainerFactory() {

      ConcurrentKafkaListenerContainerFactory<String, String> listener = 
            new ConcurrentKafkaListenerContainerFactory<>();

      listener.setConsumerFactory(consumerFactory());

      // Não falhar, caso ainda não existam os tópicos para consumo
      listener.getContainerProperties()
          .setMissingTopicsFatal(false);

      // ### AQUI
      // Commit manual do offset
      listener.getContainerProperties().setAckMode(AckMode.MANUAL);

      // ### AQUI
      // Commits síncronos
      listener.getContainerProperties().setSyncCommits(Boolean.TRUE);

      return listener;
    }
}


Enter fullscreen mode Exit fullscreen mode

Então o consumidor com Spring Kafka terá esta aparência:



import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class SpringKafkaListener {

  @KafkaListener(topics = "topico")
  public void consume(@Payload String valor, Acknowledgment ack) {

    //TODO Processar registro
    // . . . 

    // Commmit manual, que também será síncrono
    ack.acknowledge();

  }
}


Enter fullscreen mode Exit fullscreen mode

Note que mesmo assim não existe acesso ao Consumer, ao invez disso, o Spring injeta uma instância de Acknowledgment que faz o commit quando tem seu método acknowledge() executado.

Também neste exemplo o offset é confirmado a cada registro processado. Isso é algo que degrada a taxa de transferência, mas reduz ainda mais as chances de consumos duplicados. Bem, mas cada caso é um caso 😊.

O exemplo completo está disponível no Github:

GitHub logo fabiojose / skc-ex

Spring Kafka Consumer

Spring Kafka Consumer Example

Exemplo de consumer com Spring Kafka

Requerimentos

Build & Run

Maven

Para montar o fatjar, execute o comando:

Linux

./mvnw clean package
Enter fullscreen mode Exit fullscreen mode

Windows

.\mvnw.cmd clean package
Enter fullscreen mode Exit fullscreen mode

Para executar:

Você pode utilizar o docker-compose.yaml para subir um Kafka em sua máquina

java \
  -Dspring.kafka.bootstrap-servers='localhost:9092' \
  -Dspring.kafka.consumer.client-id='spring-kafka-ex' \
  -Dspring.kafka.consumer.group-id='meu-grupo' \
  -jar target/app-spring-boot.jar
Enter fullscreen mode Exit fullscreen mode

Docker

A definição Dockerfile desta aplicação emprega multi-stage builds. Isso significa que nela acontece o build da aplicação e a criação da imagem.

Se for necessário somente a criar a imagem, pode-se utilizar a definição Dockerfile-image. Mas antes é necessário montar o fatjar através do maven.

Para build do fatjar e montar a imagem, execute o comando:

docker build . -t sk-consumer-ex:1.0
Enter fullscreen mode Exit fullscreen mode

Para montar apenas a imagem (antes…

Photo by Paweł Czerwiński on Unsplash

Top comments (3)

Collapse
 
terrydeman profile image
TerryDeman

Simple, understandable but logical code Thank you for the visit, thank you also have provided the information we really need, do not forget to share information and visit our page thank you.
how to curse your ex lover

Collapse
 
lhmzhou profile image
Linda Zhou

Obrigado por compartilhar!

Collapse
 
fabiojose profile image
Fabio José

Obrigado!