Testing Micronaut Kafka

philhardwick profile image Phil Hardwick Originally published at blog.wick.technology on ・2 min read

When testing Kafka in Micronaut you can use embedded Kafka or use Testcontainers.


To use embedded Kafka set these properties in a src/test/resources/application-kafka.yaml file:

    servers: localhost:${random.port}
    enabled: true
    - topic1
    - topic2

When you set up a test with @MicronautTest(environments = "kafka") it will set the environment as both “kafka” and “test” which will enable all the properties needed to start the embedded Kafka.

It’s best to start the embedded Kafka on a random port to avoid clashing with another Kafka which may be running locally (such as in docker).

All the topics required by your application also need to be specified under the kafka.embedded.topics key so they’re created in the embedded Kafka.

Producing test messages

To set up a producer to send test messages to your application, create a file in src/test/java/ called TestProducer.java and create an interface inside with the needed annotations:

public interface TestProducer {
    void sendTestMessage(String body);

Then use the test producer by injecting it into your test:

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    public TestProducer topicClient;

    public void listensToMessages() {
        // When
        // Then assertions...
        // assertThat()...

Consuming messages from the application under test

Set up a consumer in your test classes called TestListener with the following code:

@KafkaListener(groupId = "test", clientId = "test-consumer")
public class TestListener {

    private BlockingQueue<String> messages = new LinkedBlockingDeque<>();

    public void eventOccurred(String body) {

    public BlockingQueue<String> getMessages() {
        return messages;

This allows you to get the messages that were sent to the topic and then check the contents. It also copies the approach taken by Spring Cloud Stream of storing the messages in a BlockingQueue so you can easily wait for a message to arrive and then remove it from the data structure when you retrieve it.

Using it in your test requires you to inject it:

import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import technology.wick.blog.consumer.TestListener;
import technology.wick.blog.producer.TestProducer;

import javax.inject.Inject;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    public TestListener topicListener;
    public TestProducer topicClient;

    void setUp() {
        // Given no messages exist

    public void producesMessages() throws InterruptedException {
        // When
        // Then
        String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS);


Micronaut’s approach to testing is simple but functional, there aren’t many test-only features - it requires you to use the same code you would in the real application, so it’s best to approach building a test like you would coding features in the full application.

A working example of using this code in tests can be found on Github.


Editor guide