loading...
Cover image for Vert.x Kotlin Coroutines

Vert.x Kotlin Coroutines

frosnerd profile image Frank Rosner Originally published at blog.codecentric.de ・6 min read

Vert.x

Eclipse Vert.x is an event-driven application framework that runs on the JVM. Architecturally it is very similar to Node.js, having a single-threaded event loop at its core and it heavily relies on non-blocking operations in order to be scalable. All functions of the Vert.x APIs are asynchronous and you can compose them based on callback handlers.

The following code snippet creates an HTTP server and as soon as it listens on a random port we are sending a request, printing the response after it arrives.

vertx.createHttpServer().listen {
    if (it.succeeded()) {
        val server = it.result()
        WebClient.create(vertx).get(server.actualPort(), "localhost", "").send {
            if (it.succeeded()) {
                val response = it.result()
                println(response.bodyAsString())
            }
        }
    }
}

Callback composition however can be very tedious (aka callback hell). Java developers are often utilizing Vert.x RxJava to compose asynchronous and event based programs using observable sequences. When using Vert.x with Kotlin we have another alternative for asynchronous composition: Kotlin coroutines. Below you will find the same code as before but written with coroutines rather than callback handlers:

val server = vertx.createHttpServer().listenAwait()
val response = WebClient.create(vertx)
    .get(server.actualPort(), "localhost", "").sendAwait()
println(response.bodyAsString())

The next section is going to give a quick introduction into Kotlin coroutines, independently of Vert.x. Afterwards we will see how coroutines integrate into the Vert.x APIs and how we can use that. We will close the post by summarizing the main findings.

Coroutines

Coroutine Basics

Concurrent programs are hard to reason about. Coroutines are a powerful tool for writing concurrent code. Although other models like promises or callbacks exist they can be quite difficult to understand if they are nested deeply. With coroutines you can write your asynchronous code as it was synchronous and abstract away parts of the concurrency. Let's look at the "Hello World" example in Kotlin provided by the official documentation:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // launch new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

This program will first print Hello, and then World!. The coroutine is launched asynchronously but then delayed for 1 second. The main function continues and immediately prints Hello,. It then puts the main thread to sleep so we give the coroutine some time to finish.

That seems nice but how is that different or better than using good old threads? And how does that help to avoid callback hell? First, let's do a quick dive into the anatomy of a coroutine and how they are executed.

The Anatomy of a Coroutine

Coroutines logically behave similar to threads but they are implemented differently. In fact a single thread can potentially run thousands of coroutines. This is possible because coroutines can suspend their execution, allowing the running thread to move to another coroutine and come back later. This is useful for operations that are waiting for I/O, e.g. network or file access.

Coroutines are implemented as a library rather than a language feature of Kotlin. Only the suspend keyword which is used to define suspending functions is part of the language. This enables us to switch to a different implementation for execution if required.

Coroutines can only be launched when there is a CoroutineScope available. The scope provides methods for launching coroutines, as well as an instance of a coroutine context. Coroutines always execute within a CoroutineContext, which contains a CoroutineDispatcher. The dispatcher is managing the thread or thread pool which executes the coroutines. By default a coroutine will use the context available within the current scope.

Why Coroutines Are Great

Since the rise of concurrent applications programmers are looking for abstractions that enable you to write concurrent code in a concise and understandable way. Common approaches are threads, callbacks, and futures.

Threads are expensive to create and context switching costs a lot of overhead. Callbacks might produce a Christmas tree of curly braces in your code if chained together. Futures are a nice abstraction in my opinion but require good language support (which also heavily depends on the language or library you are using) and can be difficult to grasp if you are not familiar with functional programming.

In the beginning I mentioned that coroutines allow you to write asynchronous code the same way as you would with synchronous code. What does that mean? Let's look at a simple example:

fun placeOrder(userData: UserData, orderData: OrderData): Order {
    val user = createUser(userData) // synchronous call to user service
    val order = createOrder(user, orderData) // synchronous call to order service
    return order
}

fun createUser(userData: UserData): User { ... }
fun createOrder(user: User, orderData: OrderData): Order { ... }

This code places an order of a new shop user by first creating a user by calling the user service and then placing an order for the user. createUser and createOrder are blocking operations and will block the executing thread until they are complete. Most likely they will involve some sort of I/O.

Now if we use a non-blocking library to perform the I/O we can suspend the computation until the I/O is finished and work on something else in the meantime:

suspend fun placeOrder(userData: UserData, orderData: OrderData): Order {
    val user = createUser(userData) // asynchronous call to user service
    val order = createOrder(user, orderData) // asynchronous call to order service
    return order
}

suspend fun createUser(userData: UserData): User { ... }
suspend fun createOrder(user: User, orderData: OrderData): Order { ... }

At some point we have to provide a coroutine context, e.g. by wrapping the placeOrder function inside a coroutine scope. However we did not have to actually modify the structure of the code. We only add suspend keywords or wrap a function inside launch or similar functions and that's it.

It is important to note the difference between blocking (e.g. Thread.sleep) and non-blocking (e.g. delay) operations. If your underlying functions are blocking, the coroutine will not suspend but instead block the thread it is currently executed in. But what if your non-blocking library is not written with coroutines in mind? Vert.x core API is non-blocking but heavily relies on callback handlers. In the next section we will look at how we can convert the callback API into a coroutine friendly one with just a thin layer on top.

Vert.x Kotlin Coroutines

The vertx-lang-kotlin-coroutines package mostly consists of automatically generated code that wraps the callback handler based Vert.x core API inside suspend functions so they are easy to use within coroutines.

The HttpServer class from the HTTP module provides a listen method to start the server. In Vert.x all operations are non-blocking so you will have to provide a callback handler to make sure you get the server once it is started. This is the signature of the method:

fun listen(port: Int, host: String, listenHandler: Handler<AsyncResult<HttpServer>>): HttpServer

What we would like to have, however, is a function like this:

suspend fun HttpServer.listenAwait(port: Int, host: String): HttpServer

And this is where the Vert.x coroutines package comes into play. It converts any function that requires a handler (e.g. listen) into a suspending function (e.g. listenAwait) using this helper method:

suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T {
  return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
    try {
      block.invoke(Handler { t ->
        cont.resume(t)
      })
    } catch (e: Exception) {
      cont.resumeWithException(e)
    }
  }
}

It takes the block of code that requires a handler and turns it into a coroutine. The suspendCancellableCoroutine function is provided by the core library of Kotlin coroutines. It wraps the given code into a coroutine and suspends it immediately. You can then resume or cancel it manually by accessing the cont: CancellableContinuation<T> object. We then simply resume as soon as the handler has returned a value and return this value from the function. In case the handler fails we resume with an exception.

The following animation illustrates how a Vert.x web server would handle multiple requests using coroutine handlers that use a non-blocking database connection. On every method involving database access the handler function suspends and allows other handlers to occupy the thread.

Coroutine animation

Summary

In this post I showed you how to compose asynchronous functions using coroutines. Coroutines are a nice alternative to callback based composition that are available out of the box in Kotlin. Thanks to the Vert.x Kotlin coroutines integration you can use coroutines in Vert.x as well.

Have you used coroutines before, maybe in another language? Have you used Vert.x and if so, which composition style do you prefer? Let me know your thoughts in the comments.

Posted on by:

frosnerd profile

Frank Rosner

@frosnerd

My professional interests are cloud and big data technologies, machine learning, and software development. I like to read source code and research papers to understand how stuff works.

Discussion

markdown guide
 

A question: Vertx dispatches 1 eventloop in a single thread, and dispatches 1 eventloop for each processor core (please correct me if I remember wrong).

The event loop works already in an asynchronous and non blocking way: if you block an operation you get a warning from the framework.

So I don't understand the necessity to create an async layer (the coroutine) over another async layer (the event loop).

Am I missing some piece or I have misunderstood any concept? Can you clarify?

 

Hi Marco!

I don't think you misunderstood and this is an excellent question.

Coroutines themselves are another layer on top of the event callbacks. So you are wrapping the callback based API inside coroutines in order to enable composition of asynchronous functions through Coroutines rather than callback chains.

This does not necessarily mean that the coroutines are executed outside of the event loop. While you can use a different coroutine context (e.g. runBlocking when starting your Vert.x web server to make sure to resume only as soon as it is started), most of the cases your coroutines will run through the Vert.x dispatcher.

Let's say we are building a web application and we want to write our handlers as coroutines. We can do that with a simple helper function:

fun coroutineHandler(route: Route, handler: suspend (RoutingContext) -> Unit) {
    route.handler { ctx ->
        launch(ctx.vertx().dispatcher()) {
            try {
                handler(ctx)
            } catch (e: Exception) {
                log.error("Coroutine handler failed: $e")
                ctx.fail(e)
            }
        }
    }
}

With this helper we can add handlers to routes that do not take a callback but are suspending functions instead. The coroutine itself however will be launched through the Vert.x dispatcher of the routing context, so no new execution layer is added on top.

Inside your coroutine handlers you can then, e.g., use the Vert.x web client which has different sendAwait methods thanks to Vert.x Kotlin Coroutines so they will not block your event thread.

I hope it became clearer now. Please let me know if my explanation made sense and if you have more questions :)

 
 

Amongst other issues, callbacks were the main reason not to use vert.x when vert.x 2 came out, I wrote an article about why i wouldn’t use it in production. A couple of things have been fixed with 3 but callback still made code unnecessarily unreadable for the simplest tasks. In that article suggested using coroutines, so i’m quite happy to see it. However, it’s still not available to Java. But maybe that’s one more reason to think about switching to Kotlin... 😉

 

Hi! Thanks for the comment :)

Would you mind sharing a link to your article?

 

Sure, here it is:
return.co.de/blog/articles/vertx-b...

The fun-fact is, that lager parts of my company used vert.x for 2-3 year. I strictly opposed and my team used spring boot and the recent reactive webflux. Now, all new services will be built using spring boot/webflux.

For simple CRUD style APIs with relational DBs, reactive just makes everything much more complicated without any benefit.

We have one or two services, that are doing heavy event lifting where reactive style makes sense.
And I am definitely a fan of the actor model and immutable messaging. However, I simply don’t like callbacks and obserables and such...

 

Nice post!

I worked with Vert.x in the past and it was a pain.

I like Javascript async/await style and I find this very similar.