DEV Community

Cover image for Spring Kafka Streams playground with Kotlin - IV
Marcos Maia
Marcos Maia

Posted on

Spring Kafka Streams playground with Kotlin - IV

Context

This post is part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.

Please check the first part of the tutorial to get started and get further context of what we're building.

If you want to start from here you can clone the source code for this project git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git and then checkout v7 git checkout v7 and follow from there continuing with this post.

In this post we're going to create our QuoteStream, we will process messages from the quote topic and do a join with the GlobalKTable for leverage we created in the previous post, we will then do branching and send quotes to three different topics based on it's keys.

Part IV Overview

Creating a Quote Stream

Let's now create a stream from the stock-quotes-topic and join with Leverage GlobalKTable we created in the last post, we will use a stream join and if there's a Leverage for that specific quote we enrich the quote and publish it to one topic based on it's key. This will demonstrate the usage of branching to separate data for post processing which is quite common use case.

In our sample application we will produce the data to one of three new topics, we will produce ant Apple quotes to a topic, Google quotes to another topic and all other quotes to the third topic.

  • AAPL -> apple-stocks-topic
  • GOOGL -> google-stocks-topic
  • Any others -> all-other-stocks-topic

In order to do that our first step is to create those topics using the Admin client. Add the following constants to the KafkaCofiguration class:

const val AAPL_STOCKS_TOPIC = "apple-stocks-topic"
const val GOOGL_STOCKS_TOPIC = "google-stocks-topic"
const val ALL_OTHER_STOCKS_TOPIC = "all-other-stocks-topic"
Enter fullscreen mode Exit fullscreen mode

Change the appTopics function on the same class to also create those three new topics:

@Bean
fun appTopics(): NewTopics {
    return NewTopics(
        TopicBuilder.name(STOCK_QUOTES_TOPIC).build(),
        TopicBuilder.name(LEVERAGE_PRICES_TOPIC)
            .compact().build(),
        TopicBuilder.name(AAPL_STOCKS_TOPIC).build(),
        TopicBuilder.name(GOOGL_STOCKS_TOPIC).build(),
        TopicBuilder.name(ALL_OTHER_STOCKS_TOPIC).build(),
    )
}
Enter fullscreen mode Exit fullscreen mode

The next time you run the application those new topics will be created using the admin client.

Let's now add the new schema definition which we will use for those new topics, create a new avro-schema file under src > main > avro called processed-quote with the following content:

{
  "namespace": "com.maia.springkafkastreamkotlin.repository",
  "type": "record",
  "name": "ProcessedQuote",
  "fields": [
    { "name": "symbol", "type": "string"},
    { "name": "tradeValue", "type": "double"},
    { "name": "tradeTime", "type": ["null", "long"], "default": null},
    { "name": "leverage", "type": ["null", "double"], "default":  null}
  ]
}
Enter fullscreen mode Exit fullscreen mode

Notice that the difference in this case is just a new leverage field which we will use to enrich the incoming quote with the value if they match. Build the project so the java code is generate for this new avro schema.

mvn clean package -DskipTests

Let's now create a new class on the repository package called QuoteStream, we will need a reference to our Leverage GlobalKTable so we use Spring Boot dependency injection:

@Repository
class QuoteStream(val leveragepriceGKTable: GlobalKTable<String, LeveragePrice>) {
...
Enter fullscreen mode Exit fullscreen mode

On this class declare a function to process and enrich the quote:

@Bean
fun quoteKStream(streamsBuilder: StreamsBuilder): KStream<String, ProcessedQuote> {

Enter fullscreen mode Exit fullscreen mode

In this function create a KStream which will process the data from the stock-quotes-topic, do a join with the GlobalKTable we created for leverage and transform in the new Avro Type ProcessedQuote enriching the quotes with leverage if it's available:

val stream: KStream<String, StockQuote> = streamsBuilder.stream(STOCK_QUOTES_TOPIC)

val resStream: KStream<String, ProcessedQuote> = stream
    .leftJoin(leveragepriceGKTable,
        { symbol, _ -> symbol },
        { stockQuote, leveragePrice ->
            ProcessedQuote(
                stockQuote.symbol,
                stockQuote.tradeValue,
                stockQuote.tradeTime,
                leveragePrice?.leverage
            )
        }
    )
Enter fullscreen mode Exit fullscreen mode

and to wrap up this function we will tap in the new Stream and based on the Key we will send the message to specific topics and return the new stream so we can re-use it later for other operations:

KafkaStreamBrancher<String, ProcessedQuote>()
            .branch({ symbolKey, _ -> symbolKey.equals("APPL", ignoreCase = true) }, { ks -> ks.to(AAPL_STOCKS_TOPIC) })
            .branch({ symbolKey, _ -> symbolKey.equals("GOOGL",ignoreCase = true) }, { ks -> ks.to(GOOGL_STOCKS_TOPIC) })
            .defaultBranch { ks -> ks.to(ALL_OTHER_STOCKS_TOPIC) }
            .onTopOf(resStream)

    return resStream
}
Enter fullscreen mode Exit fullscreen mode

If you just want to get your local code to this point without using the presented code you can checkout v8: git checkout v8

Cool, now we can play around a bit with it, let's build and run our application(make sure your local kafka setup is running):

mvn clean package -DskipTests && mvn spring-boot:run

You can then send some leverage messages and quote messages using the APIs as we did before in this tutorial.

And you can check the messages being enriched if the specific quote has a leverage and flowing to the three different topics based on their keys, here's some screenshots from the topics I took while playing around with the project using Conduktor.

Google topic screenshot

Apple topic screenshot

The messages seem duplicated on the screenshots but that's because I sent them multiple times with the same values while playing around. I also sent a few before sending the respective leverage so you can see what happens and check that the initial ones on the bottom have a null leverage.

That's it for now. Tomorrow I will be publishing part V of this tutorial where we will use grouping, counting and calculate volume using Kafka Streams DSL. Stay tuned.

Photo by Nubelson Fernandes on Unsplash

Discussion (0)