Context
This post is part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.
Please check the first part of the tutorial to get started and get further context of what we're building.
If you want to start from here you can clone the source code for this project
git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git
and then checkout v7git checkout v7
and follow from there continuing with this post.
In this post we're going to create our QuoteStream, we will process messages from the quote topic and do a join with the GlobalKTable for leverage we created in the previous post, we will then do branching and send quotes to three different topics based on it's keys.
Creating a Quote Stream
Let's now create a stream from the stock-quotes-topic
and join with Leverage GlobalKTable we created in the last post, we will use a stream join and if there's a Leverage for that specific quote we enrich the quote and publish it to one topic based on it's key. This will demonstrate the usage of branching to separate data for post processing which is quite common use case.
In our sample application we will produce the data to one of three new topics, we will produce ant Apple quotes to a topic, Google quotes to another topic and all other quotes to the third topic.
- AAPL ->
apple-stocks-topic
- GOOGL ->
google-stocks-topic
- Any others ->
all-other-stocks-topic
In order to do that our first step is to create those topics using the Admin client. Add the following constants to the KafkaCofiguration
class:
const val AAPL_STOCKS_TOPIC = "apple-stocks-topic"
const val GOOGL_STOCKS_TOPIC = "google-stocks-topic"
const val ALL_OTHER_STOCKS_TOPIC = "all-other-stocks-topic"
Change the appTopics
function on the same class to also create those three new topics:
@Bean
fun appTopics(): NewTopics {
return NewTopics(
TopicBuilder.name(STOCK_QUOTES_TOPIC).build(),
TopicBuilder.name(LEVERAGE_PRICES_TOPIC)
.compact().build(),
TopicBuilder.name(AAPL_STOCKS_TOPIC).build(),
TopicBuilder.name(GOOGL_STOCKS_TOPIC).build(),
TopicBuilder.name(ALL_OTHER_STOCKS_TOPIC).build(),
)
}
The next time you run the application those new topics will be created using the admin client.
Let's now add the new schema definition which we will use for those new topics, create a new avro-schema file under src > main > avro
called processed-quote
with the following content:
{
"namespace": "com.maia.springkafkastreamkotlin.repository",
"type": "record",
"name": "ProcessedQuote",
"fields": [
{ "name": "symbol", "type": "string"},
{ "name": "tradeValue", "type": "double"},
{ "name": "tradeTime", "type": ["null", "long"], "default": null},
{ "name": "leverage", "type": ["null", "double"], "default": null}
]
}
Notice that the difference in this case is just a new leverage field which we will use to enrich the incoming quote with the value if they match. Build the project so the java code is generate for this new avro schema.
mvn clean package -DskipTests
Let's now create a new class on the repository
package called QuoteStream
, we will need a reference to our Leverage GlobalKTable so we use Spring Boot dependency injection:
@Repository
class QuoteStream(val leveragepriceGKTable: GlobalKTable<String, LeveragePrice>) {
...
On this class declare a function to process and enrich the quote:
@Bean
fun quoteKStream(streamsBuilder: StreamsBuilder): KStream<String, ProcessedQuote> {
In this function create a KStream which will process the data from the stock-quotes-topic
, do a join with the GlobalKTable we created for leverage and transform in the new Avro Type ProcessedQuote
enriching the quotes with leverage if it's available:
val stream: KStream<String, StockQuote> = streamsBuilder.stream(STOCK_QUOTES_TOPIC)
val resStream: KStream<String, ProcessedQuote> = stream
.leftJoin(leveragepriceGKTable,
{ symbol, _ -> symbol },
{ stockQuote, leveragePrice ->
ProcessedQuote(
stockQuote.symbol,
stockQuote.tradeValue,
stockQuote.tradeTime,
leveragePrice?.leverage
)
}
)
and to wrap up this function we will tap in the new Stream and based on the Key we will send the message to specific topics and return the new stream so we can re-use it later for other operations:
KafkaStreamBrancher<String, ProcessedQuote>()
.branch({ symbolKey, _ -> symbolKey.equals("APPL", ignoreCase = true) }, { ks -> ks.to(AAPL_STOCKS_TOPIC) })
.branch({ symbolKey, _ -> symbolKey.equals("GOOGL",ignoreCase = true) }, { ks -> ks.to(GOOGL_STOCKS_TOPIC) })
.defaultBranch { ks -> ks.to(ALL_OTHER_STOCKS_TOPIC) }
.onTopOf(resStream)
return resStream
}
If you just want to get your local code to this point without using the presented code you can checkout v8:
git checkout v8
Cool, now we can play around a bit with it, let's build and run our application(make sure your local kafka setup is running):
mvn clean package -DskipTests && mvn spring-boot:run
You can then send some leverage messages and quote messages using the APIs as we did before in this tutorial.
And you can check the messages being enriched if the specific quote has a leverage and flowing to the three different topics based on their keys, here's some screenshots from the topics I took while playing around with the project using Conduktor.
The messages seem duplicated on the screenshots but that's because I sent them multiple times with the same values while playing around. I also sent a few before sending the respective leverage so you can see what happens and check that the initial ones on the bottom have a null leverage.
That's it for now. Tomorrow I will be publishing part V of this tutorial where we will use grouping, counting and calculate volume using Kafka Streams DSL. Stay tuned.
Photo by Nubelson Fernandes on Unsplash
Top comments (0)