DEV Community

loading...

Quarkus, WebSockets and Kafka

anthonyikeda profile image Anthony Ikeda ・7 min read

So previously, we looked at how we can quickly get Quarkus up and running and create a basic UI to send messages back and forth from the client to the server. In this article we will take it to the next level and introduce Kafka as a messaging platform and have incoming messages from a topic pushed directly to the User Interface.

There is no true session management in this article, we can cover that in the future, but this does demonstrate how easy it is to manage some basic users and broadcast to them all.

Getting Kafka Up and Running

For this to work, we are going to need a Kafka instance up and running, so we will start with that.

This are the requirements for this article:

  • Java 11
  • Apache ZooKeeper
  • Kafka 2.3.0
  • The source code will be on this branch

We will refer to the location you unzipped Kafka as KAFKA_HOME

Starting ZooKeeper

Once you've downloaded zookeeper, unzip it to a directory and ensure Java 11 is the current JDK.

Next, we want to create a conf/zoo.cfg file with the following properties:

cfg/zoo.cfg

tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=60

You can configure the dataDir to any location as long as the server can write to that directory. You can then start ZooKeeper with:

$ bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED

Next we will setup Kafka.

Kafka Up and Running

To get Kafka running we first need to make sure we have Java 11 set as the JDK.

Next start up Kafka with:

$ bin/kafka-server.sh start config/server.properties
INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
INFO starting (kafka.server.KafkaServer) [2020-09-08 19:04:53,486] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=14.0.2 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
...
INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
INFO Loading logs. (kafka.log.LogManager)
INFO Logs loading complete in 10 ms. (kafka.log.LogManager)
INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)

There will be a bunch of mesages, but the more important one is the listener that was started: EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)

This indicates we can connect to Kafka using a non-secured connection on port 9092

Create our Topic

We need to manually create a topic that we can read and write from. Open up a terminal, navigate to the KAFKA_HOME directory and execute the following command:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic chat-messages --partitions 1 --replication-factor 1
Created topic chat-messages.

This will create a new topic for us called chat-messages.

Updating the WebSocket API

In order to continue, we will need some more dependencies in our WebSocket API to connect to Kafka.

  • io.quarkus:quarkus-kafka-streams
  • org.testcontainers:testcontainers
  • org.testcontainers:kafka

Update the pom.xml dependencies with:

pom.xml

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.14.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.14.3</version>
    <scope>test</scope>
</dependency>

Configure the Kafka Connection

Next we want to make sure we have the application configured to connect to our Kafka server. Open the src/main/resources/application.properties and make the following changes:

quarkus.kafka-streams.application-server=localhost:8011
quarkus.kafka-streams.application-id=${quarkus.application.name}
quarkus.kafka-streams.bootstrap-servers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
quarkus.kafka-streams.topics=chat-messages

For the Kafka host, we have have defined either the KAFKA_HOST environment variable with a fallback of localhost and a port set to the KAFKA_PORT environment variable with a fallback to 9092. We have also set a default topic to chat-messages which we created earlier.

⚠️ NOTE
If you want to run a quick compile from here there are a couple of things we need to add to our test configuration:

src/test/resources/application.properties
quarkus.application.name=test-websockets
quarkus.log.category."com.brightfield.streams".level=ALL
quarkus.kafka-streams.topics=chat-messages

Create the Kafka Consumer

In order to do this, we will be updating our SocketEndpoint class

First, let's create a method to broadcast to all users who are connected:

private void broadcast(String message) {
    socketSessions.values().forEach(s -> {
        s.getAsyncRemote().sendText(message, result -> {
            if (result.getException() != null) {
                log.error("Unable to send message: {}", result.getException().getMessage(), result.getException());
            }
        });
    });
}

As you can see we are iterating through the Map we created of the different user sessions indexed by user name and creating an Async Remote to send the text message to each user.

Next let's add the consumer, again in the SocketEndpoint class we want to add the following code:

@Produces
public Topology buildTopology() {
    log.info("Building the Topology...");
    StreamsBuilder builder = new StreamsBuilder();

    builder.stream("chat-messages", Consumed.with(Serdes.String(), Serdes.String()))
        .peek((id, message) -> {
            log.info("Incoming transaction: {}", message);
            broadcast(message);
        });
    return builder.build();
}

Here we have specified the stream we want to listen to and use a String KeySerializer and a String ValueSerializer to read the message from the topic. We then log the message and broadcast it to all users connected over the WebSocket.

Updating the Unit Tests

If we try and build the service we will hit a wall when running the tests if you don't have a Kafka server running. If you do, you will find the unit tests getting stuck because there is no shutdown process in the test. This is where testcontainers come into play.

In the unit test we created in the previous article, we are going to enhance it to use a new lifecycle for our test Kafka server.

First we will create our test kafka instance:

src/test/java/com/brightfield/streams/InfrastructureTestResource.java

package com.brightfield.streams;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {

    private final Logger log = LoggerFactory.getLogger(InfrastructureTestResource.class);
    private final KafkaContainer kafkaContainer = new KafkaContainer("5.5.1");

    @Override
    public int order() {
        return 1;
    }

    @Override
    public void init(Map<String, String> initArgs) {
        log.info("Initialising...");
    }

    @Override
    public Map<String, String> start() {
        log.info("Starting kafka test container...");
        this.kafkaContainer.start();

        log.info("Creating topic...");
        createTopics("chat-messages");
        return configurationParameters();
    }

    @Override
    public void stop() {
        this.kafkaContainer.close();
    }

    private void createTopics(String... topics) {
        var newTopics =
            Arrays.stream(topics)
                .map(topic -> new NewTopic(topic, 1, (short) 1))
                .collect(Collectors.toList());
        try (var admin = AdminClient.create(Map.of("bootstrap.servers", getKafkaBrokers()))) {
            admin.createTopics(newTopics);
        }
    }

    private String getKafkaBrokers() {
        this.kafkaContainer.getFirstMappedPort();
        return String.format("%s:%d", kafkaContainer.getContainerIpAddress(), kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT));
    }

    private Map<String, String> configurationParameters() {
        log.info("Returning configurationParameters...");
        final Map<String, String> conf = new HashMap<>();
        String bootstrapServers = getKafkaBrokers();
        log.info("Brokers: {}", bootstrapServers);
        conf.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        conf.put("quarkus.kafka-streams.bootstrap-servers", bootstrapServers);
        conf.put("mp.messaging.outgoing.delivery.bootstrap.servers", bootstrapServers);
        return conf;
    }
}

Next we want our test to use this Resource:

src/test/java/com/brightfield/streams/SocketEndpointTest.java

@QuarkusTest
@QuarkusTestResource(value = InfrastructureTestResource.class)
public class SocketEndpointTest {
...
}

When you compile and run the unit tests, you should now see the test running the websocket tests and connecting to the kafka container and then disconnecting and not getting stuck. By creating the InfrastructureTestResource, we have basically added a lifecycle to how the Kafka container is being managed.

  • First, the init() method is called. In our scenario we are just logging out that the init() method has been called.
  • Next the start() method is called which creates the topics on the testcontainer we want to use then returns the configuration of the Kafka container.
  • When the tests are complete, the close() method is called to clean up and shut down the Kafka container.

⚠️
We won't be implementing full testing of the Kafka integration in this article, what we have for now will be sufficient to test from the command line.

Running our Stack

Everything should now be in place. Let's start our service and angular client application and see if it works!

Sending some test messages through the web interface should function as it did before:

Alt Text

To test our broadcast capabilities, we will revert to the command line and publish the messages from there.

Access the KAFKA_HOME directory in a terminal window and enter:

$ bin/kafka-console-producer.sh --broker-list=localhost:9092 --topic chat-messages
>Below
>People

You should see the user interface update with the same values:

Alt Text

Conclusion

With this as your base you can come up with a straight forward fully blown messaging tool; listing users, their statuses, and even group messages.

In a future article we will explore how to test the Kafka component in the meantime, happy chatting!

Discussion (0)

pic
Editor guide