DEV Community

Rafael Berçam
Rafael Berçam

Posted on

5 1 1 1 1

🧪 Validando Mensagens no Kafka com Kotlin e Awaitility

Fala pessoal! Neste post, vou te guiar por um exemplo prático de como testar a publicação de mensagens em um tópico Kafka usando Kotlin, Awaitility e o KafkaConsumer. Esse tipo de validação é essencial em sistemas distribuídos para garantir que as mensagens sejam enviadas e recebidas corretamente.

📚 Estrutura do Projeto

A estrutura básica do projeto é assim:

.
├── src
│   ├── main
│   │    └── kotlin
│   │         └── api
│   │              └── KafkaProducerService.kt
│   └── test
│        └── kotlin
│             └── api
│                  └── KafkaApiTest.kt
└── pom.xml (ou build.gradle.kts para Kotlin)
Enter fullscreen mode Exit fullscreen mode

Certifique-se de incluir as seguintes dependências no seu pom.xml (Maven) ou build.gradle.kts (Gradle):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-kotlin</artifactId>
    <version>2.16.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

🤔 O Que é o Awaitility?

O Awaitility é uma biblioteca Java/Kotlin projetada para facilitar a espera por condições assíncronas em testes. Em vez de usar Thread.sleep() (o que é ineficiente), o Awaitility permite esperar de forma mais inteligente até que uma condição seja atendida.

✅ Por que usar Awaitility com Kafka?

  • Os consumidores Kafka não recebem mensagens imediatamente (processamento assíncrono).

  • Precisamos esperar até que a mensagem seja publicada no tópico e lida pelo consumidor.

📊 Exemplo Prático: Validando Transações Bancárias

Vamos criar um teste que:

  1. Publica uma mensagem de uma transação bancária em um tópico Kafka.

  2. Consome a mensagem do tópico.

  3. Valida se o conteúdo recebido está correto.

🛠️ O Modelo de Transação

data class TransacaoBancaria @JsonCreator constructor(
@JsonProperty("idTransacao") val idTransacao: String,
@JsonProperty("tipo") val tipo: String,
@JsonProperty("valor") val valor: Double,
@JsonProperty("contaOrigem") val contaOrigem: String,
@JsonProperty("contaDestino") val contaDestino: String,
@JsonProperty("dataHora") val dataHora: String
) {
companion object {
fun criarAleatoria(): TransacaoBancaria {
val tipos = listOf("TRANSFERENCIA", "PAGAMENTO", "DEPOSITO")
return TransacaoBancaria(
idTransacao = UUID.randomUUID().toString(),
tipo = tipos.random(),
valor = (100..10000).random().toDouble(),
contaOrigem = "${(100000..999999).random()}-${(0..9).random()}",
contaDestino = "${(100000..999999).random()}-${(0..9).random()}",
dataHora = java.time.LocalDateTime.now().toString()
)
}
}
}
view raw KafkaApiTest.kt hosted with ❤ by GitHub

📬 O Teste com Kafka e Awaitility

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaApiTest {
private lateinit var kafkaProducer: KafkaProducerService
private lateinit var kafkaConsumer: KafkaConsumer<String, String>
private val topic = "transacoes-bancarias"
private val objectMapper = ObjectMapper() // Para serializar e desserializar JSON
@BeforeAll
fun setup() {
kafkaProducer = KafkaProducerService(topic)
// Configuração do consumidor Kafka
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ConsumerConfig.GROUP_ID_CONFIG, "grupo-de-teste")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Lê desde o início do tópico
}
kafkaConsumer = KafkaConsumer<String, String>(props)
kafkaConsumer.subscribe(listOf(topic))
}
@AfterAll
fun tearDown() {
kafkaProducer.close()
kafkaConsumer.close()
}
@Test
fun `deve enviar e validar transacao bancaria no Kafka`() {
val key = UUID.randomUUID().toString()
// Criação da transação bancária
val transacao = TransacaoBancaria.criarAleatoria()
// Serializa a transação em JSON
val message = objectMapper.writeValueAsString(transacao)
// Envia a transação para o Kafka
kafkaProducer.sendMessage(key, message)
// Aguarda e valida se a mensagem foi recebida no Kafka
await().atMost(Duration.ofSeconds(30)).untilAsserted {
val registros = kafkaConsumer.poll(Duration.ofMillis(500))
assertTrue(
registros.any {
it.key() == key && validaMensagem(it.value(), transacao)
},
"A transação não foi encontrada ou está incorreta no Kafka"
)
}
}
// Função auxiliar para validar a mensagem recebida
private fun validaMensagem(receivedMessage: String, expected: TransacaoBancaria): Boolean {
return try {
val transacaoRecebida = objectMapper.readValue(receivedMessage, TransacaoBancaria::class.java)
assertEquals(expected.idTransacao, transacaoRecebida.idTransacao, "ID da transação incorreto")
assertEquals(expected.tipo, transacaoRecebida.tipo, "Tipo de transação incorreto")
assertEquals(expected.valor, transacaoRecebida.valor, "Valor da transação incorreto")
assertEquals(expected.contaOrigem, transacaoRecebida.contaOrigem, "Conta de origem incorreta")
assertEquals(expected.contaDestino, transacaoRecebida.contaDestino, "Conta de destino incorreta")
assertEquals(expected.dataHora, transacaoRecebida.dataHora, "Data e hora incorretas")
true
} catch (e: Exception) {
println("Erro ao validar mensagem: ${e.message}")
false
}
}
}
view raw KafkaApiTest.kt hosted with ❤ by GitHub

🧐 Explicando o Teste

  1. Produzimos uma mensagem Kafka com kafkaProducer.sendMessage().

Tela do IntelliJ com teste executado com sucesso

  1. Consumimos com kafkaConsumer.poll().

  2. Usamos o await().untilAsserted para esperar até a mensagem ser validada.

Se a mensagem não for encontrada ou os dados estiverem incorretos, o teste falha com uma mensagem de erro clara. ✅

Tela do Docker com o tópico da transação feita pelo teste

📢 Conclusão

Testar mensagens Kafka de forma assíncrona é essencial para garantir a integridade do sistema. Usando o Awaitility com KafkaConsumer, conseguimos validar mensagens de forma eficiente.

Se você gostou do conteúdo ou tem dúvidas, deixe um comentário! 🚀

👉 Me siga no dev.to para mais conteúdo de qualidade!

Happy coding! 💻

🔗 Links Referencias e Projeto no GitHub

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)