DEV Community

Vinicius Carvalho
Vinicius Carvalho

Posted on

Kotlin coroutine based KafkaProducer extension

Getting rid of Callback hell in KafkaProducer

Kotlin's coroutines provides a nice way to write async code. It makes it easy to write and compose asynchronous computation using a very light-weight model.

This post is not about what coroutines are, the link with the docs have a very deep and easy to read explanation of that. Instead I'm offering a solution to using KafkaProducer.send method.

The issue is send() leverages a Callback strategy, and we all know that there's a special place in hell for those who use callbacks.

Fortunately Kotlin coroutines offer a solution: suspendCoroutine function, that allows us to transform a callback into a suspend function call.

Receiver functions are also another nice treat of Kotlin language. It allows us to augment regular types with custom functions.

I decided to call the new function dispatch instead of send because I find a bit confusing when people decide to extend original function using the same name, and imports can get a bit messy.

So the extension function you need to write is very simple:

suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }
Enter fullscreen mode Exit fullscreen mode

Now you can just use it from your regular KafkaProducer instance:

val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = JacksonSerializer::class.java
val kafkaProducer = KafkaProducer<String, SensorReading>(props)
async {
    kafkaProducer.dispatch(ProducerRecord("sample", SensorReading("Bedroom", 72.0, false)))
}

Enter fullscreen mode Exit fullscreen mode

Just remember that you can only call a suspend function within the boundaries of a coroutine, hence the need for async, same could be achieved with launch or runBlocking for testing.

Happy coding!

Top comments (2)

Collapse
 
asafmesika profile image
asaf mesika

Did you manage to git rid of Threads in Consumer side as well?

Collapse
 
gklijs profile image
Gerard Klijs

Consumer is not thread safe, so I don't see how, unless you can 'force' a coroutine to always run on the same underlying thread. But that would kind of defeat the purpose for coroutines.