DEV Community

Fabio José for Kafka BR

Posted on

Como implementar Dead-letter Topic com Spring Kafka

Dead-letter Topic, Dead-letter Queue ou em bom e velho português: Tópicos de mensagens não-entregues. São tópicos necessários em sistemas distribuídos onde a comunicação é assíncrona e através de brokers como o Kafka.

Os dados que chegam nestes tópicos passaram por todas as tentativas possíveis para tratamento de erros e já não resta mais nada a ser feito, se não, a intervenção humana. Assim, não será qualquer erro que levará mensagens ou eventos a serem publicados em um tópico dead-letter.

No mundo Kafka um tópico dead-letter é destinado aos registros consumidos, que por algum erro irrecuperável não puderam ser processados com sucesso.


Neste artigo será demonstrada uma abordagem para implementar DLT no Kafka, utilizando Java e Spring. Para os impacientes 🧐, estes são os fontes :

GitHub logo fabiojose / spring-kafka-dlt-ex

Dead-letter Topic com Spring Kafka


Classificar Erros

Antes de escrever qualquer linha de código é necessário classificar os erros e como serão tratados.

Existem dois tipos de erros:

  • recuperáveis
  • não-recuperáveis

Os erros recuperáveis ou que podem ser tratados, são aqueles onde alguma abordagem será empregada para tentar finalizar o fluxo com sucesso.

Por exemplo, no Java, os erros ConnectException e UnknownHostException podem ser recuperados através de retentativas, pois normalmente são causados por instabilidades momentâneas no serviço ou na rede.

Já os não-recuperáveis são aqueles que independente do que seja feito, não será possível finalizar o fluxo com sucesso. Logo, estes são candidatos a seguirem diretamente para o tópico dead-letter. E como exemplo, se você utiliza Avro, o erro AvroMissingFieldException indica a ausência de um campo requerido, não havendo nada a fazer. Portanto será inútil a retentativa, por exemplo.


E além dos erros técnicos, você também deverá classificar seus erros de negócio e escolher uma estratégia para tratá-los.


Bem, agora veremos como implementar DLT para problemas técnicos no Java. E é bem provável que você encontre equivalentes na sua linguagem ou framework.

Recuperáveis

Aqui segue uma lista de erros técnicos recuperáveis que, em nome da simplicidade, serão tratados através de retentativas.

  • ConnectException
  • UnknownHostException
  • ProductNotFoundException: a título de exemplo, este erro de negócio foi definido como tratável. Porque um serviço chamado Catalogo, que faz event-sourcing das mudanças emitidas pelo serviço Produto, ainda não processou o evento com o produto em questão. Mas através da retentativa seu fluxo poderá finalizar com sucesso.

Não-recuperáveis

Estes são alguns dos erros não-recuperáveis, ou seja, se passarem pelo mesmo processo de retentativas somente consumiriam recursos, sem chances de finalizar com sucesso.

Implementação

Existe um ótimo artigo no blog de engenharia do Uber que descreve uma arquitetura para dead-letter. Vale a leitura!

Esta implementação foi dividida em três partes:

  • tópicos
  • construção
  • tratamento

Tópicos

Para cada processo que terá tratamento dead-letter, será criado um tópico com o sufixo -dlt. Tome como exemplo um processo que realiza a reserva de estoque a partir das ordens de compra publicadas no tópico ordem-compra. Então, ao invés de criar um tópico ordem-compra-dlq, será utilizado um nome relevante ao processo que está falhando:

  • reservar-estoque-dlt

E mais os tópicos para retentativas, que devem ser quantos forem necessários até finalmente o registro chegar ao tópico dead-letter. Digamos que serão no máximo quatro retentativas além da inicial:

  • reservar-estoque-retry-1
  • reservar-estoque-retry-2
  • reservar-estoque-retry-3
  • reservar-estoque-retry-4

Os tópicos devem ter características que não interfiram no andamento das retentativas ou na publicação do tópico dead-letter. Um exemplo é a configuração max.message.bytes, que pode acarretar um erro chamado message too large. Portanto os tópicos deverão permitir mensagens maiores, caso contrário também não será possível utilizá-los porque são idênticos ao principal, inclusive com as mesmas limitações.

Construção

A construção foi feita com Spring Kafka porque ele já vem com muitas utilidades para tratamento para dead-letter, o que contribui muito com a produtividade se ela é o foco no seu projeto. Mas nada impede que você escreva a mesma solução com Clientes Kafka no Java ou na sua linguagem do seu projeto.

Para utilizar os recursos destinados a dead-letter, é necessário customizar a fábrica de objetos encarregada de produzir listeners Kafka no Spring.

Assim, a configuração programática foi sub-dividida em três partes:

  • resolver: responsável por determinar o tópico destino do registro que está no contexto do erro.
  • errorHandler: manipulador do erro
  • kafkaListernerContainerFactory: fabrica instâncias que são utilizadas nos métodos anotados com @KafkaListener

E para cada uma das sub-divisões existe uma implementação Main e outra Retry. Como é possível imaginar, uma cuida das configurações para o processamento principal outro para as retentativas.

Main resolver, responsável por determinar qual o tópico destino com base no erro-raiz lançado ao processar o registro consumido do tópico ordem-compra:



@Bean
public BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition >
 mainResolver() {

  return new BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > () {
   @Override
   public TopicPartition apply(ConsumerRecord < ? , ? > r, Exception e) {

    // ####
    // Por padrão, quando é não-recuperável, segue diretamente p/ dead-letter
    TopicPartition result =
     new TopicPartition(dltTopic,
      QUALQUER_PARTICAO);

    // ####
    // Trata-se de um erro recuperável?
    final boolean recuperavel = isRecuperavel(e);
    if (recuperavel) {

     Optional < String > origem =
      topicoOrigem(r.headers())
      .or(() -> Optional.of(NENHUM_CABECALHO));

     // ####
     // Se origem for outro tópico, segue para o primeiro retry
     String destino =
      origem
      .filter(topico -> !topico.matches(retryTopicsPattern))
      .map(t -> retryFirstTopic)
      .orElse(dltTopic);

     result = new TopicPartition(destino, QUALQUER_PARTICAO);
    }

    return result;
   }
  };
 }


Enter fullscreen mode Exit fullscreen mode

Main error handler, que utiliza o Main resolver e é responsável por definir duas configurações essenciais para tratamento dos erros:

  • DeadLetterPublishingRecoverer: inicia o fluxo DLT, que é delegado pelo SeekToCurrentErrorHandler caso as retentativas locais não resolvam o erro.
  • SeekToCurrentErrorHandler: manipula qualquer erro que seja lançado no método anotado com @KafkaListener. Naturalmente, se você capturá-los com catch e não permitir que eles subam na pilha, não será possível tratá-los.


@Bean
public SeekToCurrentErrorHandler mainErrorHandler(
 @Qualifier("mainResolver")
 BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > resolver,
 KafkaTemplate < ? , ? > template) {

 // ####
 // Recuperação usando dead-letter 
 DeadLetterPublishingRecoverer recoverer =
  new DeadLetterPublishingRecoverer(template, resolver);

 // ####
 // Tentar 3x localmente antes de iniciar o fluxo dead-letter
 SeekToCurrentErrorHandler handler =
  new SeekToCurrentErrorHandler(recoverer, RETENTAR_3X);

 // ####
 // Lista das exceções não-recuperáveis, para evitar o retry local
 excecoes.getNaoRecuperavies().forEach(e ->
  handler.addNotRetryableException(e));

 return handler;
}


Enter fullscreen mode Exit fullscreen mode

Main listener factory, encarregado de fabricar os consumidores que processam registros do tópíco ordem-compra.



@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, GenericRecord >>
 mainKafkaListenerContainerFactory(
  @Qualifier("mainErrorHandler") SeekToCurrentErrorHandler errorHandler,
  KafkaProperties properties,
  ConsumerFactory < String, GenericRecord > factory) {

  ConcurrentKafkaListenerContainerFactory < String, GenericRecord > listener =
   new ConcurrentKafkaListenerContainerFactory < > ();

  listener.setConsumerFactory(factory);

  // ####
  // Utilizando o mainErrorHandler para tratar os erros
  listener.setErrorHandler(errorHandler);

  // Falhar, caso os tópicos não existam?
  listener.getContainerProperties()
   .setMissingTopicsFatal(missingTopicsFatal);

  // Commit do offset no registro, logo após processá-lo no listener
  listener.getContainerProperties().setAckMode(AckMode.MANUAL);

  // Commits síncronos
  listener.getContainerProperties().setSyncCommits(Boolean.TRUE);

  return listener;
 }


Enter fullscreen mode Exit fullscreen mode

Então, basta anotar o método como segue.

Mas vale ressaltar que o offset sempre deverá ser confirmado, porque todo o fluxo que trata os erros será feito por retentativa e talvez culminando no tópico dead-letter. Analise o seu caso-de-uso e entenda se esta abordagem também se aplicada.



@KafkaListener(
 id = "main-kafka-listener",
 topics = "${app.kafka.consumer.topics}",
 containerFactory = "mainKafkaListenerContainerFactory"
)
public void consume(@Payload ConsumerRecord < String, GenericRecord > record,
 Acknowledgment ack) throws Exception {

 try {

  // #### 
  // Processar
  process(record);

 } finally {

  // ####
  // Sempre confirmar o offset
  ack.acknowledge();
 }

}


Enter fullscreen mode Exit fullscreen mode

Já a configuração para o processamento das retentativas segue moldes similares, com algumas exceções:

  • resolver: retorna qual o próximo tópico na sequência de retentativas ou se é o reservar-estoque-dlt, caso já tenham passadas por todas.
  • errorHandler: nenhuma retentiva local.

Este é o método anotado com @KafkaListener que tratará os tópicos reservar-estoque-retry:



@KafkaListener(
 id = "retry-kafka-listener",
 topicPattern = "${app.kafka.dlt.retry.topics.pattern}",
 containerFactory = "retryKafkaListenerContainerFactory",
 properties = {
  "fetch.min.bytes=${app.kafka.dlt.retry.min.bytes}",
  "fetch.max.wait.ms=${app.kafka.dlt.retry.max.wait.ms}"
 }
)
public void retry(@Payload ConsumerRecord < String, GenericRecord > record,
 Acknowledgment ack) throws Exception {

 try {

  // ####
  // Reprocessar
  process(record);

 } finally {
  // ####
  // Sempre confirmar o offset
  ack.acknowledge();
 }

}


Enter fullscreen mode Exit fullscreen mode
  • topicPattern: consumir todos os tópicos reservar-estoque-retry
  • fetch.min.bytes e fetch.max.wait.ms: utilizados para provocar um certo atraso no consumo dos registros, visto que sem eles o consumo seria praticamente instantâneo.

Por fim, o application.properties será assim:



app.kafka.dlt.retry.topics=4
app.kafka.dlt.retry.topics.pattern=reservar-estoque-retry-[0-9]+
app.kafka.dlt.retry.topic.first=reservar-estoque-retry-1
app.kafka.dlt.topic=reservar-estoque-dlt

# Lista de exceções recuperáveis
app.kafka.dlt.excecoes.recuperaveis[0]=java.net.ConnectException
app.kafka.dlt.excecoes.recuperaveis[1]=java.net.UnknownHostException

# Lista de exceções não-recuperáveis
app.kafka.dlt.excecoes.naoRecuperaveis[0]=org.apache.avro.AvroMissingFieldException
app.kafka.dlt.excecoes.naoRecuperaveis[1]=java.lang.NullPointerException

# Provocar atraso no processmento de retentativa
app.kafka.dlt.retry.max.wait.ms=20000
app.kafka.dlt.retry.min.bytes=52428800


Enter fullscreen mode Exit fullscreen mode

Todo a implementação está disponível no Github, clone it and have fun!

GitHub logo fabiojose / spring-kafka-dlt-ex

Dead-letter Topic com Spring Kafka

spring-kafka-dlt-ex

Dead-letter Topico com Spring Kafka e formato de dados Avro.

Requerimentos

  • JDK 11
  • Apache Maven 3.6+
  • Docker 19+
  • Acesso ao repositório https://repo.maven.apache.org/maven2/ ou uma alternativa com acesso às dependências presentes no pom.xml
  • Schema Registry
  • Kafka
  • Testcontainers
  • Lombok
  • Commons Lang3

Configurações

Não se preocupe, pois apesar de existirem atalhos pelas variávies de ambiente, você pode utilizar tranquilamente aquilo que o Spring Boot oferece. Então veja todos as propriedades no application.properties

No caso do Kafka, utilizamos Spring Kafka, então você utilizar o modo Spring para configurações.

Variáveis de Ambiente

  • APP_KAFKA_CONSUMER_TOPICS: tópicos para consumir, ou expressão.

  • KAFKA_CLIENT_ID: nome do cliente Kafka, usado pelos brokers para logs e métricas. Utilize um nome clean, não genérico.

    • spring.kafka.producer.client-id, spring.kafka.consumer.client-id
  • KAFKA_CONSUMER_GROUP: nome do grupo de consumo que esta aplicação pertence

    • spring.kafka.consumer.group-id
  • KAFKA_BOOTSTRAP_SERVERS: lista de brokers para o cluster Kafka

    • spring.kafka.bootstrap-servers
  • SCHEMA_REGISTRY_URL: url para o registro de esquemas Avro

    • spring.kafka.properties.schema.registry.url

Quando o registro atinge o último tópico de retentativa, que neste caso é o reservar-estoque-4 e não seja processado com sucesso, finalmente ele será publicado no tópico dead-letter, então a equipe deverá estar preparada para o tratamento adequado.

Tratamento

Bem, neste momento o registro com problemas já desembarcou no tópico reservar-estoque-dlt e bem antes disso acontecer um sistema preciso de monitoramento dos tópicos deveria ter alertado sobre o uso dos tópicos para retentativas, principalmente se os registros estão atingindo o reservar-estoque-retry-4.

Veja neste artigo como monitorar seu cluster Kafka.

A maneira mais prudente de tratamento, além do monitoramento e um ótimo sistema de rastreamento distribuído, será construir um processo em que para cada registro publicado no DLT, tickets e notificações ChatOps deveram ser enviados para a equipe responsável.

Também são necessárias ferramentas adequadas para, por exemplo, editar registros e colocá-los novamente no tópico original.


Bem, é isso! Até o próximo.

Top comments (5)

Collapse
 
miguelgarciath profile image
miguel.garcia

Excelente artigo, Fábio! Foi muito útil para retry queues! No entanto, tenho uma dúvida: estou a tentar usar topicPattern para as queues normais mas não consigo. Além da prop no ficheiro de props e do @KafkaListener é preciso mais alguma alteração?
Obrigado!

Collapse
 
fabiojose profile image
Fabio José

Obrigado Miguel. Me envie o erro, a versão do Kafka e pattern que você está usando. Assim consigo te dizer se existe alguma inconsistência.

Collapse
 
miguelgarciath profile image
miguel.garcia

Obrigado, José! Depois consegui resolver, infelizmente não me lembro qual foi o problema, mas é possível que tenha sido um "escape" qualquer na expressão. Obrigado!

Collapse
 
fabriciolfj profile image
fabriciolfj

Parabéns post muito bom.

Collapse
 
fabiojose profile image
Fabio José

Obrigado Fabricio!