DEV Community

Cover image for Spring Kafka Streams playground with Kotlin - III
Marcos Maia
Marcos Maia

Posted on • Updated on

Spring Kafka Streams playground with Kotlin - III

Context

This post is part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.

Please check the first part of the tutorial to get started and get further context of what we're building.

If you want to start from here you can clone the source code for this project git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git and then checkout v6 git checkout v6 and follow from there continuing with this post.

In this post we're going to create our first Spring Kafka Streams, GlobalKTable with a RocksDB state store and expose it on our Rest Controller created in part II using a ReadOnlyKeyValueStore implementation.

Part III Overview Diagram

Configure App for Spring Kafka Streams

Now that we're going to use Kafka Streams we need to let Spring Boot know that so we start adding the @EnableKafkaStreams Spring Kafka annotation to the kafkaConfiguration class definition, just below the @EnableKafka one we already have:

@Configuration
@EnableKafka
@EnableKafkaStreams
class KafkaConfiguration { 
...
Enter fullscreen mode Exit fullscreen mode

We also need to add the basic default streams configuration the same way we did to the consumers and producers before but this time we will use the programmatic approach instead of adding as a Spring Boot configuration to the application.yaml file, which also would work but it's nice to have examples of both approaches so you're aware of them and can pick the one that suits you the best.

In case you have multiple consumers, producers or streams in the same Spring Boot application and want different configurations for each you will have to take the programmatic approach as it gives you more flexibility to configure each specific client while the configuration approach only enables you to configure the default spring bean of each type.

In the KafkaConfiguration class we create the default Streams configuration:

@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
    fun defaultKafkaStreamsConfig(): KafkaStreamsConfiguration {
        val props: MutableMap<String, Any> = HashMap()
        props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_HOSTS
        props[StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG] =
            LogAndContinueExceptionHandler::class.java
        props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = SCHEMA_REGISTRY_URL
        props[StreamsConfig.APPLICATION_ID_CONFIG] = "quote-stream"
        props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
        props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
        props[ConsumerConfig.GROUP_ID_CONFIG] = "stock-quotes-stream-group"

        return KafkaStreamsConfiguration(props)
    }
Enter fullscreen mode Exit fullscreen mode

and we will also add to the KafkaConfiguration class another bean so we can print out the Stream states which is good for learning and understanding purposes, in a real application you could choose to not do this and, if you do, you would use a logger instead of stdout printing it like we do in this example.

@Bean
fun configurer(): StreamsBuilderFactoryBeanConfigurer? {
    return StreamsBuilderFactoryBeanConfigurer { fb: StreamsBuilderFactoryBean ->
        fb.setStateListener { newState: KafkaStreams.State, oldState: KafkaStreams.State ->
            println("State transition from $oldState to $newState")
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Create a GlobalKTable for Leverage

Now that our Kafka Streams are properly configured we can create our first Kafka Stream element. We will start creating a GlobalKTable and a materialized view to the Leverage prices.

Create a new class called LeverageStream in the same repository package, annotate it with @Repository.

@Repository
class LeverageStream {
...
Enter fullscreen mode Exit fullscreen mode

Now before creating our GlobalKTable and Materialize it we need to tell Kafka Streams which type of Serialization and Deserialization it will need for the object we will process and store. In Kafka Streams this is done using a Serde and because we are using Avro we will use an Avro Serde SpecificAvroSerde passing in our LeveragePrice type we have created in a previous step, add the following to the LeverageStream class to declare the Serde, configure and initialize it.

private val leveragePriceSerde = SpecificAvroSerde<LeveragePrice>()

val serdeConfig: MutableMap<String, String> = Collections.singletonMap(       AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)

@PostConstruct
fun init() {
    leveragePriceSerde.configure(serdeConfig, false)
}

Enter fullscreen mode Exit fullscreen mode

Now that we told Kafka Streams the Serde and the configuration where to get the Avro Schema we can add the following code which creates a GlobalKTable with a materialized RocksDB KeyValue store.

    @Bean
    fun leveragePriceBySymbolGKTable(streamsBuilder: StreamsBuilder): GlobalKTable<String, LeveragePrice> {      
        return streamsBuilder
            .globalTable(
                LEVERAGE_PRICES_TOPIC,
                Materialized.`as`<String, LeveragePrice, KeyValueStore<Bytes, ByteArray>>(LEVERAGE_BY_SYMBOL_TABLE)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(leveragePriceSerde)
            )
    }

Enter fullscreen mode Exit fullscreen mode

The materialized view will by default create a RocksDB locally to your application for you, you don't have to manage or explicitly do anything it's... Magic!

You might be wondering where the hack is this RocksDB being created, well you can control that using a configuration StreamsConfig.STATE_DIR_CONFIG i.e - state.dir and point it to the location you want. In this case I left the defaults as we didn't specify the property so it will pick the default depending on the O.S you're running this tutorial. On linux machines you will find it under /tmp/kafka-streams/${stream-id}/${type}/rocksdb/${local_storage_name} folder, the full path for this case is: /tmp/kafka-streams/quote-stream/global/rocksdb/leverage-by-symbol-ktable/ where you can see the RocksDB files:

RocksDB files

Expose Leverage Materialized view

Great, now that we have a local storage we want to be able to query it to get the most up to date Leverage for the instruments.

In order to be able to query RocksDB we will declare a ReadOnlyKeyValueStore, we will use Kotlin's lateinit modifier as we need to bind in the Stream lifecycle before we initialize it. i.e - the RocksDB storage needs to be in place. Declare it in LeverageStream:

private lateinit var leveragePriceView: ReadOnlyKeyValueStore<String, LeveragePrice>
Enter fullscreen mode Exit fullscreen mode

and then declare a @Bean listener to initialized it once the Stream is created:

@Bean
fun afterStart(sbfb: StreamsBuilderFactoryBean): StreamsBuilderFactoryBean.Listener {
    val listener: StreamsBuilderFactoryBean.Listener = object : StreamsBuilderFactoryBean.Listener {
        override fun streamsAdded(id: String, streams: KafkaStreams) {
            leveragePriceView = streams.store<ReadOnlyKeyValueStore<String, LeveragePrice>>(
                    StoreQueryParameters.fromNameAndType(
                        LEVERAGE_BY_SYMBOL_TABLE,
                        QueryableStoreTypes.keyValueStore()
                    )
            )
        }
    }
    sbfb.addListener(listener)
    return listener
}
Enter fullscreen mode Exit fullscreen mode

We can then create a function that we use to get the specific value passing in a key:

fun getLeveragePrice(key: String?): LeveragePrice? {
    return leveragePriceView[key]
}
Enter fullscreen mode Exit fullscreen mode

Finally we can then add a new REST endpoint to our QuotesController class to expose the Leverage:

    @GetMapping("leveragePrice/{instrumentSymbol}")
    fun getLeveragePrice(@PathVariable instrumentSymbol: String): ResponseEntity<LeveragePriceDTO> {
        val leveragePrice: LeveragePrice = leverageStream.getLeveragePrice(instrumentSymbol)
            ?: return ResponseEntity.noContent().build()
        // if quote doesn't exist in our local store we return no content.
        val result = LeveragePriceDTO(leveragePrice.symbol.toString(), BigDecimal.valueOf(leveragePrice.leverage))
        return ResponseEntity.ok(result)
    }
Enter fullscreen mode Exit fullscreen mode

With that we can build and run our application: mvn clean package -DskipTests && mvn spring-boot:run. Make sure to have a local Kafka running as explained in part 1 and part 2 of this series and execute some calls:

  • Add a leverage which will send a new Leverage to the Kafka Topic:
POST http://localhost:8080/api/leverage
Content-Type: application/json

< ./test-data/leveragePrice1.json
Enter fullscreen mode Exit fullscreen mode

Query it from the RocksDB local storage:

GET http://localhost:8080/api/leveragePrice/APPL
Enter fullscreen mode Exit fullscreen mode

Add a few more quotes and play around querying it so you'll see how the storage is automatically updated with the most up to date quote you've sent.

As usual you can check the code up to this point checking out the project repo with git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git and check out v7 with git checkout v7.

This is it for part 3 of this series, I hope you had some fun learning and playing with Kafka Streams so far. In the next post of this series we will create a Stream for Quotes, Join with Leverage and branch producing it to different topics based on some criteria, stay tuned.

Have fun, be kind to yourself and others!

Photo by Ryland Dean on Unsplash

Discussion (0)