Introdução:
O Apache Kafka é uma plataforma de streaming distribuída que permite o envio e a recepção de fluxos de eventos em tempo real. No contexto do Kafka, um Producer é responsável por enviar mensagens para um ou mais tópicos, enquanto um Consumer é responsável por receber essas mensagens e processá-las.
Neste tutorial, vamos explorar como configurar um cluster do Kafka usando Docker Compose e desenvolver um Consumer e um Producer usando a linguagem de programação Go (Golang). Essa combinação é ideal para construir aplicações escaláveis e de alto desempenho que podem se beneficiar do poder e da flexibilidade do Kafka.
Pré-requisitos:
Antes de começar, certifique-se de ter os seguintes itens instalados em sua máquina:
- Docker: https://docs.docker.com/get-docker/
- Docker Compose: https://docs.docker.com/compose/install/
- Go (Golang): https://golang.org/doc/install
Configurando o Kafka com Docker Compose:
Comece criando um arquivo chamado docker-compose.yml
no diretório de sua escolha e adicione o seguinte conteúdo:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
Neste arquivo, estamos configurando dois serviços: o ZooKeeper, que é necessário para o funcionamento do Kafka, e o Kafka em si. A porta 2181 é usada para se conectar ao ZooKeeper e a porta 9092 para se conectar ao Kafka. Note que definimos a propriedade KAFKA_ADVERTISED_LISTENERS
para que o Kafka seja acessível tanto dentro do contêiner (PLAINTEXT://kafka:9092
) quanto fora (PLAINTEXT_HOST://localhost:9092
).
Para iniciar o cluster do Kafka, execute o seguinte comando na raiz do seu projeto:
docker-compose up -d
O papel do ZooKeeper no Kafka
O ZooKeeper é um serviço centralizado usado pelo Kafka para coordenar e gerenciar os nós do cluster. Ele é responsável por eleger um líder para cada partição do tópico, manter informações de configuração e status dos nós, além de auxiliar na detecção de falhas e no processo de failover.
No nosso exemplo, configuramos o ZooKeeper como um serviço separado no Docker Compose. O Kafka depende do ZooKeeper para registrar e manter as informações do cluster, garantindo que todos os nós estejam sincronizados.
Configurando o ambiente Go
Agora que o Kafka está em execução, vamos configurar nosso ambiente Go para desenvolver o Consumer e o Producer. Abra um terminal e crie um novo diretório para o projeto. Dentro dele, inicialize um módulo Go executando o seguinte comando:
go mod init kafka-tutorial
Isso criará um arquivo go.mod
para gerenciar as dependências do projeto.
Vamos usar a lib Sarama
A lib Sarama
é uma biblioteca cliente para interagir com o Apache Kafka em Go. Ela fornece uma API fácil de usar para criar produtores e consumidores de mensagens Kafka, bem como administrar tópicos, partições e offsets.
Para instalá-la, execute o seguinte comando:
go get github.com/Shopify/sarama
Criando o Consumer e o Producer:
Agora vamos criar os arquivos consumer.go e producer.go para implementar o Consumer e o Producer, respectivamente.
No consumer.go, adicione o seguinte código:
package main
import (
"fmt"
"log"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln("Failed to start consumer:", err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln("Failed to close consumer:", err)
}
}()
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetOldest)
if err != nil {
log.Fatalln("Failed to start partition consumer:", err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln("Failed to close partition consumer:", err)
}
}()
for message := range partitionConsumer.Messages() {
fmt.Printf("Received message: Key = %s, Value = %s\n", string(message.Key), string(message.Value))
}
}
No producer.go, adicione o seguinte código:
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln("Failed to start producer:", err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln("Failed to close producer:", err)
}
}()
message := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalln("Failed to send message:", err)
}
log.Printf("Message sent! Partition = %d, Offset = %d\n", partition, offset)
}
No código acima, estamos no Consumer se conectando ao Kafka e consumindo mensagens do tópico "test-topic" a partir do offset mais antigo. No Producer, estamos enviando uma mensagem com o valor "Hello, Kafka!" para o tópico "test-topic".
Executando o Consumer e o Producer
Para executar o Producer, abra um terminal, navegue até o diretório do projeto e execute o seguinte comando:
go run producer.go
Para executar o Consumer, abra outro terminal, navegue até o diretório do projeto e execute o seguinte comando:
go run consumer.go
Agora você verá o Consumer recebendo a mensagem enviada pelo Producer.
Conclusão:
Neste tutorial, aprendemos como configurar um cluster do Kafka usando Docker Compose e como desenvolver um Consumer e um Producer usando a linguagem Go. O Kafka é uma poderosa plataforma de streaming que pode ser integrada a várias aplicações para processar eventos em tempo real. Com essa combinação, você está pronto para construir aplicações escaláveis
Top comments (2)
Parabéns pelo post! Excelente tutorial.
Surgiu uma dúvida para alguns casos de usos: Além do offset mais antigo, é possível definir um offset específico para o Consumer começar a consumir as mensagens?
Existe sim na lib do sarama tem como você pegar o offset mais recente sarama.OffsetNewest ou passar o offset na mão
Porém vai se você optar por passar um offset específico, lembre-se de que será responsabilidade do consumidor controlar e gerenciar o progresso dos offsets. Você precisará acompanhar manualmente o offset atualizado e garantir que ele seja salvo corretamente para continuar a partir desse ponto em futuras leituras.