DEV Community

loading...
Cover image for Configurando o Producer Kafka para enviar headers com Go e Sarama

Configurando o Producer Kafka para enviar headers com Go e Sarama

diogodantas profile image Diogo Ventura Dantas ・4 min read

Recentemente tenho estudado e trabalhado bastante com o Apache Kafka. Dentro do ecossistema golang existem diversas bibliotecas disponíveis para realizar essa integração, alguns exemplos são:

Cada uma tem suas particularidades, vantagens e desvantagens. Tenho realizado testes com todas e em um post futuro podemos falar mais sobre cada uma delas. Antes de irmos direto pro código vou falar um pouco sobre os Headers e quando essa funcionalidade foi incluída no Kafka.

Headers

O Kafka é totalmente agnóstico em relação ao conteúdo da mensagem que o Producer envia, ou seja, ele deixa a cargo do usuário a tarefa de enriquecer ou atribuir mais significado a uma mensagem. Uma alternativa para contornar esse problema é utilizar padrões estruturados como JSON ou AVRO onde o usuário é livre para definir os campos necessários e pode incluir metadados facilmente.

Um header é um par (chave,valor) e uma única mensagem pode conter diversos headers. Esse é um conceito encontrado em sistemas de mensagem como JMS e de transporte como TCP e HTTP e eles podem ser utilizados para roteamento, filtros e anotações. O Kafka adicionou suporte a headers a partir da sua versão v0.11.0.0. Podemos utilizar os headers para adicionar informarções extras as mensagens quem podem interessar a diferentes Consumers.

Vamos pro código

Depois dessa pequena introdução vamos ao que interessa! Antes de mais nada, precisamos de um ambiente com o Kafka configurado e executando e da biblioteca instalada no ambiente de desenvolvimento. A instalação é bem simples só precisa executar o seguinte comando:

go get -u github.com/Shopify/sarama

Após a instalação já podemos começar a escrever o código que vai estabelecer a conexão com o Kafka e produzir as mensagens. O primeiro passo é instanciar o objeto de configuração e criar o Producer.

func initProducer() (sarama.SyncProducer, error) {
    // setup sarama log to stdout
    sarama.Logger = log.New(os.Stdout, "", log.Ltime)

    // producer config
    config := sarama.NewConfig()
    config.Producer.Retry.Max = 5
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Return.Successes = true
    config.Version = sarama.V0_11_0_0

    // create producer
    prd, err := sarama.NewSyncProducer([]string{kafkaConn}, config)

    return prd, err
}

Esta etapa pode parecer banal e passar despercebida quando não se tem experiência com a biblioteca. A documentação da biblioteca não deixa claro que precisamos especificar a versão do Kafka que será utilizada e sem essa configuração não iremos conseguir utilizar os headers. No meu primeiro contato com a biblioteca cheguei a perder um bom tempo sem conseguir produzir no tópico por causa dessa configuração. Após gastar algum tempo pesquisando encontrei uma issuer no repositório da biblioteca que me ajudou a entender o problema. Para quem tiver curiosidade deixarei o link para a issuer a seguir:

Header information not received in the consumer #1074

Versions

Sarama Version: v1.16.0 Kafka Version: kafka_2.11-1.0.0.tgz Go Version: go1.10 darwin/amd64

Configuration

I have written a small test reproducer. Please see attached files producer.go and consumer.go.

Logs

The output from running my test producer which has the headers set: 41973 producer.go:24] producer message &{send test message for headers [{[] []} {[116 101 115 116 72 101 97 100 101 114 49] [116 101 115 116 86 97 108 117 101 49]}] 0 0 0001-01-01 00:00:00 +0000 UTC 0 0} 41973 producer.go:27] producer message header key testHeader1, value testValue1

The output from running my test consumer which has zero headers: 41959 consumer.go:25] Message topic: send, partition: 0, offset: 0, key: , value: test message for headers 41959 consumer.go:26] Consumer message header size 0

Problem Description

When sending a message with header from producer side, the header information got lost from the consumer side.

Some information about the project I am working on: In my project, I am using the kafka header to propagate the zipkin tracing information. I am using the sarama library for the producer side and sarama-cluster library for the consumer side.

In the simple reproduce I wrote, I used sarama library for both producer and consumer to rule out potential issues from sarama-cluster library.

For the documentation, the consumer message header is supported for kafka version 0.11+. And I am using Kafka 1.0.0 version which should have the support.

Agora que tudo está configurado podemos escrever o código que vai ser responsável por produzir as mensagens no tópico. A biblioteca do Sarama possui uma estrutura para a mensagem do produtor e do consumidor. Essa estrutura irá receber informações como o tópico para o qual a mensagem será enviada, a chave dessa mensagem, os metadados, os headers e até mesmo dados que serão preenchidos somente quando a mensagem for entregue no barramento Kafka, como o offset e a partição.

func produce(message string, headers map[string]string, producer sarama.SyncProducer) {
    // publish sync
    msg := &sarama.ProducerMessage{
        Topic:   topic,
        Value:   sarama.StringEncoder(message),
        Headers: convertHeaders(headers),
    }
    p, o, err := producer.SendMessage(msg)
    if err != nil {
        fmt.Println("Error publish: ", err.Error())
    }

    fmt.Println("Partition: ", p)
    fmt.Println("Offset: ", o)
}

func convertHeaders(headers map[string]string) []sarama.RecordHeader {
    output := make([]sarama.RecordHeader, 0)
    for key, value := range headers {
        output = append(output, sarama.RecordHeader{
            Key:   []byte(key),
            Value: []byte(value),
        })
    }
    return output
}

Depois de conhecer as especificidades da biblioteca o código acima é bastante simples e de fácil compreensão. Precisamos apenas instanciar uma mensagem e atribuir os valores. Para enviarmos os headers precisamos converter para o padrão da biblioteca e por isso temos a função convertHeaders. Essa função é responsável por preencher a estrutura sarama.RecordHeader que é a modelagem da biblioteca para lidar com essa funcionalidade. Ela é bem simples e como já descrito anteriormente possui apenas uma chave e um valor que é associado a essa chave. Com tudo isso configurado utilizaremos o Producer que foi criado e configurado anteriormente e enviaremos a mensagem.

A biblioteca Sarama é sem dúvidas a mais consolidada e utilizada mas é um pouco mais verbosa e baixo nível, outras bibliotecas adicionam camadas de abstração que facilitam a vida do programador e tornam o utilização mais amigável. Cada biblioteca tem sua particularidade e as escolhas de design trazem consigo um trade off, por ser mais verbosa e com menos abstrações a biblioteca tem uma maior curva de aprendizado mas possibilita maior controle e flexibilidade ao programador, mas isso é um assunto para uma publicação futura. Abaixo deixo o código completo disponível para quem quiser copiar, estudar e realizar seu próprio teste.

Isso é tudo pessoal

Ficou com a alguma dúvida ? Ou sentiu que esqueci de alguma coisa ? Se sinta a vontade para deixar um comentário. Continuarei compartilhando o que estou estudando, aprendendo e aplicando no meu dia a dia.

Discussion (0)

pic
Editor guide