DEV Community

Cover image for Recebendo Mensagens
Victor Osório
Victor Osório

Posted on • Edited on

Recebendo Mensagens

Ao iniciar o post Enviando Mensagens, falei que enviar é simples. Se você não achou simples, depois de ver a complexidade de receber uma mensagem verá que sim, é simples!

Parâmetros comuns

Para configurar um consumer são necessários mais parâmetros. Os parâmetros abaixo são similares ao do producer com excessão da troca de Serializador por Desserializador, visto que a mensagem deve ser agora transformada de byte[] para um objeto.

  • BOOTSTRAP_SERVERS: Conjunto de pares IP:PORTA para acessar o cluster. Por exemplo, kafka-1:9092,kafka-2:9092 irá acessar o cluster que contem os duas instâncias kafka-1:9092 e kafka-2:9092
  • KEY_DESERIALIZER_CLASS: É a classe que fará a desserialização da Chave. O que é e como é usada a chave, vamos explicar posteriormente. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.
  • VALUE_DESERIALIZER_CLASS: É a class que fará a desserialização do Valor. Não há mensagem sem um valor. Deve ser uma implementação da interface org.apache.kafka.common.serialization.Deserializer.

Dados os parametros abaixo, agora é preciso configurar o GROUP_ID. Antes de escolher um, é necessário pensar em como será a leitura dessa mensagem.

Read once

Um Producer, escreve uma mensagem por tópico. Ele não se preocupa como essa mensagem será lida. Já o Consumer deve se preocupar para que a mensagem não apenas uma vez, não menos que isso.

No Kafka, uma mensagem é recebida apenas uma vez por um Consumer de um GROUP_ID. Isso que dizer que, se dois processos diferentes tiverem consumers com o mesmo GROUP_ID, essa mensagem será consumida por apenas um.

Essa funcionalidade pode ajudar muito, mas pode também atrapalhar. O cuidado que deve ser tomado é:

  • Para cada tipo de consumer, sempre escolher um GROUP_ID por tipo
  • Tratar as mensagens como idempotentes

Instanciando um Consumer

Para instanciar um consumer, então, são necessários os 4 parâmetros: BOOTSTRAP_SERVERS, KEY_DESERIALIZER_CLASS, VALUE_DESERIALIZER_CLASS e GROUP_ID.

package io.vepo.kafka.articles;
import static java.util.Arrays.asList;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class SimpleConsumer {
public static void main(String[] argv) throws Exception {
if (argv.length != 1) {
System.err.println("Please specify 1 parameters: topic-name");
System.exit(-1);
}
String topicName = argv[0];
System.out.println("Reading messages from Topic: " + topicName);
// Configure the Consumer
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "SimpleConsumerGroup");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(configProperties)) {
consumer.subscribe(asList(topicName));
mainLoop: while (true) {
for (ConsumerRecord<String, String> message : consumer.poll(Duration.ofSeconds(1))) {
System.out.println("Message received from partition=" + message.partition() + " with offset="
+ message.offset());
System.out.println("Key=" + message.key() + "\t value=" + message.value());
if ("exit".equals(message.value())) {
break mainLoop;
}
}
}
}
System.out.println("Exiting...");
}
}

Pronto, criado o Consumer, deve-se fazer o pull e processar as mensagens.

No exemplo acima, há uma condição de saida, mas isso é apenas para exemplo. Normalmente um programa que processas mensagens opera até ser interrompido.

Conclusão

Criar um Consumer para um Tópico Kafka é simples, não tão simples quanto um Producer. Deve-se atentar para quantos outros consumers vão concorrer pelas mensagens, se uma mensagem deve ser lida por um ou mais consumer. E ainda deve-se tratar a mensagem como se ela fosse repetida.

Ficou com dúvida... alguns pontos ainda serão tradados. Mas pode perguntar!

Top comments (0)