DEV Community

Cover image for Spring Kafka Streams playground with Kotlin - II
Marcos Maia
Marcos Maia

Posted on • Updated on

Spring Kafka Streams playground with Kotlin - II


This post is part of a series where we create a simple Application with Kotlin using Spring boot and Spring Kafka.

Please check the first part of the tutorial to get started and get further context of what we're building.

If you want to start from here you can clone the source code for this project git clone and then checkout v4 git checkout v4 and follow from there continuing with this post.

In this second part we will build the producers LeverageProducer and QuotesProducer and our Controller QuoteController and also use the Kafka client Admin to create the topics for us.

Part II Diagram

We will then be able to send some messages using the REST endpoinst we created and check the messages in Kafka using the command line client or Conduktor. Let's go for it.

Setting up Kafka Admin to create topics

In this section we will create the spring-kafka setup so Kafka Admin Client create the topics for us on the broker on Application startup.

For this sample application I will keep the package structure very simple and we will end up with two subpackages, api where our rest endpoints will be created and repository where we will put all kafka configuration and kafka client code. In real projects I usually have other layers and I tend to use business related packaging paths to segregate subdomain, but this is to be discussed in another post.

  1. Create a package called repository and a new Kotlin class KafkaConfiguraton in this new package.

Visualizaton of new package structure

Add the Spring boot @Configuration and Spring Kafka @EnableKafka annotations to your class, create constants with the topic names and a Spring @Bean with spring-kafka Admin code to create the two initial topics for leverage and quotes. See code fragment below, notice that we will create the leverage topic as a Kafka compact topic. We will later build a Global KTable to read this data, we will also use the default settings for this simple example.

KafkaConfiguration Kotlin class:

class KafkaConfiguration {

    // TOPICS
    val quotesTopic = "stock-quotes-topic"
    val leveragePriceTopic = "leverage-prices-topic"

    fun appTopics(): NewTopics? {
        return NewTopics(


Enter fullscreen mode Exit fullscreen mode

Add the following property to the application.yaml under src/main/resources. If your file is named you can just change the name / extension to yaml instead.

    bootstrap-servers: localhost:9092

Enter fullscreen mode Exit fullscreen mode

You may git checkout v5 to check out the code up to this point.

Start local Kafka on compose or Kubernetes and then build and run this application from your IDE or from command line:

mvn clean package -DskipTests

run it:

mvn spring-boot:run

Now when the application starts you should see the Kafka Client Admin configuration info on the logs:

Show logs with Kafka Admin lines

Ok, once the application starts you can now check on the kafka broker that the two topics are now available due to the Admin client code we just created.

Enter the running container: kubectl exec -it kafka-0 -- bash and then list topics kafka-topics --list --bootstrap-server kafka:29092, you should see the topics:

Show command line with topics listed by the broker

Use exit to go back to the host command line.

Create Kafka producers

In this next section we will create the Kafka producers and API endpoints to send messages to the topics we just created.

Kafka Producers are thread safe and we could use a single producer passing in different types of Schemas and topics but in this tutorial to avoid abstractions we will make it explicit creating two producers.

First lets add the generic producer configuration to spring-kafka producers to the application.yaml file, under spring.kafka add:

      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        schema.registry.url: http://localhost:8081

Enter fullscreen mode Exit fullscreen mode

The configurations tell our producer clients where to find Kafka and the Schema registry and the types of the key/value pairs we will be sending to our topics.

Creating the producers:

In the producers we will need to reference the topics to send messages, it would be better to have a central point from where to get them. Just now we added the topic names to the KafkaConfiguration class but they are normal Kotlin val attributes, let's make a change and add them as constants instead so we can easily access them, change the KafkaConfiguration class to be like this so we can have access to those constants from our producers:

class KafkaConfiguration {
    fun appTopics(): NewTopics {
        return NewTopics(

// constants for topics
const val STOCK_QUOTES_TOPIC = "stock-quotes-topic"
const val LEVERAGE_PRICES_TOPIC = "leverage-prices-topic"

Enter fullscreen mode Exit fullscreen mode

Notice that above we moved the constants outside of the class definition.

Under the repository package create two new Kotlin classes called LeveragePriceProducer and QuotePriceProducer and add the following content to them:

LeveragePriceProducer class:

class LeveragePriceProducer(val leveragePriceProducer: KafkaTemplate<String, LeveragePrice> ) {

    fun send(message: LeveragePrice) {
        leveragePriceProducer.send(LEVERAGE_PRICES_TOPIC, message.symbol.toString(), message)


Enter fullscreen mode Exit fullscreen mode

StockQuoteProducer class:

class StockQuoteProducer(val quoteProducer: KafkaTemplate<String, StockQuote>) {

    fun send(message: StockQuote) {
        quoteProducer.send(STOCK_QUOTES_TOPIC, message.symbol.toString(), message)

Enter fullscreen mode Exit fullscreen mode

This is it for the producers, pretty simple! This is possible because when using Spring boot and Spring Kafka optimized defaults are applied for any non specified configurations but if you need you can easily override them in configurations or using Spring @Bean definitions.

Let's now build two REST endpoints so we can send messages to those topics using the producers we just created. To pass in the messages we will also create two wrapper DTOs which will be used to pass in data to our REST endpoints.

Create a package called api and a Kotlin class called QuotesController and using constructor depencency injection pass in a reference to our producers, this will be the initial content:

class QuotesController(val stockQuoteProducer: StockQuoteProducer, val leveragePriceProducer: LeveragePriceProducer) {}

Enter fullscreen mode Exit fullscreen mode

Ok, now let's create a new package under our just create api called dto where we will put our simple DTOs called LeveragePriceDTO and StockQuoteDTO with the following content:

LeveragePriceDTO class:

class LeveragePriceDTO(val symbol: String, val leverage: BigDecimal)
Enter fullscreen mode Exit fullscreen mode

StockQuoteDTO class:

class StockQuoteDTO(val symbol: String, val tradeValue: BigDecimal, @JsonFormat(shape = JsonFormat.Shape.STRING, timezone = "UTC") val isoDateTime: Instant)
Enter fullscreen mode Exit fullscreen mode

This is the package structure after creating these three new classes:

Package structure for Controller and DTOs

Let's now create two endpoint so we can process and send messages to Kafka, create a new function newQuote inside the QuotesController class:

    fun newQuote(@RequestBody stockQuoteDTO: StockQuoteDTO): ResponseEntity<StockQuoteDTO> {
        val stockQuote = StockQuote(stockQuoteDTO.symbol, stockQuoteDTO.tradeValue.toDouble(), stockQuoteDTO.isoDateTime.toEpochMilli())
        stockQuoteProducer.send(stockQuote) // fire and forget
        return ResponseEntity.ok(stockQuoteDTO)

Enter fullscreen mode Exit fullscreen mode

And for Leverage create a new function newLeveragePrice with the following content:

    fun newLeveragePrice(@RequestBody leveragePriceDTO: LeveragePriceDTO): ResponseEntity<LeveragePriceDTO> {
        val leveragePrice = LeveragePrice(leveragePriceDTO.symbol, leveragePriceDTO.leverage.toDouble())
        leveragePriceProducer.send(leveragePrice) // fire and forget
        return ResponseEntity.ok(leveragePriceDTO)
Enter fullscreen mode Exit fullscreen mode

Now we need to send some messages to these REST endpoints which will send them to our Kafka topics, in order to do that create a new folder called test-data on the root of the project folder and add a few JSON files with data for leverage and quotes as the examples below:

leveragePrice1.json file:

  "symbol": "APPL",
  "leveragePrice": 144.54
Enter fullscreen mode Exit fullscreen mode

stockQuote1.json file:

  "symbol": "APPL",
  "tradeValue": 123.45,
  "isoDateTime": "2021-11-04T08:22:35.000000Z"
Enter fullscreen mode Exit fullscreen mode

Note that we will be receiving an ISO date so make sure to follow the pattern when creating new files.

The structure after creating a few sample data files should be like this:

Structure with sample files

Now let's create a file called test-data.http on the root of our project where we will have some http calls to invoke our newly created endpoints:

test-data.http file:

POST http://localhost:8080/api/quotes
Content-Type: application/json

< ./test-data/stockQuote1.json

POST http://localhost:8080/api/leverage
Content-Type: application/json

< ./test-data/leveragePrice1.json

Enter fullscreen mode Exit fullscreen mode

You can use postman or convert those commands to curl if you prefer. Or if you're using IntelliJ the http files should be automatically recognizable and you can run them directly from the IDE. If you're using VSCode you can install a REST extension which recognizes the http file and you can also run those commands directly from the IDE.

IntelliJ - You can run the calls clicking the green arrow as seen below :

IntelliJ http file view

VSCode - You can run the calls clicking the Send Request text on the http file as seen below:

VSCode http file view

Send some messages to the endpoint and after that let's check the messages on the Broker, I will show how to do that using the Kubernetes setup which is in the project. If you're using docker compose please check the article references on beginning of part I of this series where it's explained in details how to do that using compose.

Checking the messages

First we need to enter the running schema-registry container, so:

  1. List the running pods:

kubectl get pods

you should find the schema-registry pod and copy it's name:

Kubernetes running pods

  1. Enter the schema registry container(use the pod name from the command on step 1):

kubectl exec -it schema-registry-54bd4d4b4f-f5wm5 -- bash

  1. Run an avro console consumer to consume the messages from the topics, example below is showing the stock-quotes-topic, replace the topic name to check other topics.

kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic stock-quotes-topic --from-beginning --property "schema.registry.url=http://schema-registry:30081"

After client initialization you should see the messages, I'ves sent some multiple times to illustrate:

Topic messages

A good alternative to using command line and having to enter the running container is to use some Kafka client tooling or kcat. As tooling I really come to enjoy lately using Conduktor which is free for personal use, has a lot of features and is very intuitive to use.

Same messages visualized using Conduktor:

Visualize messages with Conduktor

That's it for this post, congratulations if you made it this far, it means you're successfully sending messsages to both topics and we have prepared the ground so we can have some fun using Kafka Streams starting in the next post. Stay tuned.

As usual you can checkout the code up to this stage from the project repo using: git checkout v6

Photo by Adi Goldstein on Unsplash

Discussion (0)