This article is originally published on iO TechHub by Léo Schneider and Mehmet Akif Tütüncü.
Introduction
In this second part of the article, we will show that we can use Kotlin coroutines to have a reactive application while writing code in direct style.
We expect to start from a Spring Boot reactive project using Reactor, written in Java, see part 1.
In the first part, we will focus on Kotlin and adapting the project to the language. In the second part, we will explore coroutines, and how we can use them.
Reactive with Kotlin
As of now, we have a reactive application, throughout the whole chain.
We even have a safeguard to detect blocking calls, to verify the non-blocking nature of the code at runtime.
We could stop everything right here, but there is one possible upgrade that we can do at this point.
That upgrade is too interesting to skip, even if it means that we have to change the programming language!
You read it in the title, this is of course Kotlin
.
Kotlin Overview
Interestingly enough, Kotlin
is a multiplatform language, initially used for Android development, but it is getting now more and more used as a backend language.
The compiled code will be normal Java byte code, so it is even possible to have both Kotlin
and Java
files in the same project.
It also means that we can keep Java frameworks and libraries, so no other change is required there.
Kotlin Features
Kotlin has many interesting features, some of them are even copied by Java in the latest versions.
Null Safety
One major feature is that every type has the variation nullable
and not nullable
.
The compiler can know at compile time if a NullPointerException
could happen, and will refuse to compile if such a case isn't handled.
Immutability
Everything is immutable in Kotlin by default: the collections are immutable unless you specify otherwise.
You can use val
to mark your variables as constant references.
Knowing what can and what cannot change, it helps to reduce the amount of side effects produced.
Extension Functions
Kotlin provides the ability to create functions from a receiver without inheriting the class.
You can then extend the functionality of classes that you don't own.
Standard Library
The standard library of the language has many, many more convenient functions if we compare it to Java.
For example, you don't need to use stream()
whenever you want to do something from your Collections.
And thanks to the extension functions, you can even get additional methods on your Java types.
Migrate the project to Kotlin
The next step is now to upgrade our project to Kotlin. We have two methods possible:
1. Use IDE Automatic Tool
while it works for most methods, it will limit the types to the ones that the IDE can see.
for example:
@get:GetMapping
val allWeather: Flux<WeatherInfo>
get() = weatherService.allWeather
This method started with get
, the IDE concluded that this was a property called allWeather
.
It created a property with a getter to replace it instead.
But us humans can see that this was meant to be a method instead, you have to manually readjust it to:
fun getAllWeather(): Flux<WeatherInfo> {
return weatherService.getAllWeather()
}
2. Manually Change the Files
As the code automatically generated by the IDE isn't ideal to have some readable code,
it is better to rewrite it manually, or at least to review all code automatically generated.
it sounds like a very time-consuming task, but using some regexes, find/replace, and multi carets help
to change some files relatively quickly.
almost every function will change from that pattern:
public Foo foo(Bar bar) {}
to this one:
fun foo(bar: Bar): Foo {}
Migrate to Data Classes
Java restricts us to have only one class per file.
Sometimes we have very simple classes, in that case, we can migrate them as data class
, which has its equivalent in Java as Record
.
The added value of Kotlin is that we can put them all in one single file. The resulting file can contain the entire domain for a small module like this one.
package com.io.reactivecoroutines.weather.api
import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDate
data class Forecast(
@JsonProperty("forecastday") val days: List<ForecastDay>
)
data class ForecastDay(
val date: LocalDate,
@JsonProperty("day") val temperature: Temperature,
)
data class Location(
@JsonProperty("name") val city: String,
val region: String,
val country: String,
)
data class Temperature(
@JsonProperty("maxtemp_c") val maxC: Double,
@JsonProperty("maxtemp_f") val maxF: Double,
@JsonProperty("mintemp_c") val minC: Double,
@JsonProperty("mintemp_f") val minF: Double,
@JsonProperty("avgtemp_c") val avgC: Double,
@JsonProperty("avgtemp_f") val avgF: Double,
)
Testing
If you are using Mockito
, you can realize that the function when()
is now invalid because it is a reserved keyword in Kotlin.
Mockito has a Kotlin variation of the library, you can get the dependency: org.mockito.kotlin:mockito-kotlin
to get some better support than the Java one.
when
is called whenever
, but aside from that, almost everything should be similar.
Now that we have the whole project in Kotlin, we can go right away to what we are looking for: Coroutines
Coroutines
Coroutines were first introduced in Design of a Separable Transition-Diagram Compiler, published in 1963.
It represents a part of a program that is separable, which can pause and resume its execution.
It means that if we have functions that can be suspended, we can have non-blocking behavior and asynchronous calls.
With more tweaking, we can have concurrency and enable parallelism.
Launching a new flow of execution with coroutines is very lightweight compared to instantiating a new thread, so we can have many coroutines at the same time.
This shows how powerful coroutines are and why they get interesting for us, especially for non-blocking scenarios.
Kotlin has coroutines that are supported by the language and implemented by a library (the implementation remains platform-dependent):
kotlinx.coroutines
and a sandbox environment is available here to try it out.
Coroutines as Part of the Language
Coroutines in Kotlin allow us to write code in direct style, but runs asynchronously with callbacks.
when you change a function with the modifier suspend
, then you indicate that this function can suspend its execution
Thus, it can run only in a coroutine context. The compiler will effectively wrap the returning type with a Continuation
.
In direct style we would typically have code written like this:
fun postItem(item: Item) {
val token = createToken()
val response = client.post(item, token)
handleResponse(response)
}
If we chain callbacks about the continuation of the flow, it would be something like this which gets quickly difficult to read:
fun postItem(item: Item) {
createToken { token ->
client.post(item, token) { response ->
handleResponse(response)
}
}
}
Simply by adding the suspend
keyword to the direct style code, we get it to run with continuation callbacks!
We can now have the best of both worlds.
suspend fun postItem(item: Item) {
/*Suspend*/ val token = createToken()
/*Suspend*/ val response = client.post(item, token)
handleResponse(response)
}
Integration With Project Reactor
If you want to keep your code just the way it is with the project Reactor and benefit from its specific APIs,
then you can simply wrap everything in Kotlin code, just the way you had it in Java.
because of the interoperability in between libraries, you can use asFlow()
and asFlux()
to change your code.
This is especially valuable if you intend to do some migration. Because in that case, you can do it step by step.
You can benefit from the quality of life offered by Kotlin and keep your reactor-specific code just how it is.
But that wouldn't be a good reason to migrate the whole project.
It is interesting to know that this is possible, but what we really want is to use coroutines as it will reduce significantly the complexity of the code that you read.
Coroutines Migrations
In our project, we realize that we need to suspend all the functions in the chain from controllers to repositories.
The best thing to start with is to start from the external layers such as repositories and then move in the chain layer by layer.
1. Repository Layer
In the repository layer, we previously had a reactive repository:
@Repository
interface WeatherRepository : ReactiveCrudRepository<WeatherInfo, String>
We can change it to CoroutineCrudRepository
and we can also get one that supports sorting: CoroutineSortingCrudRepository
:
these interfaces come from the dependency spring-boot-starter-data-r2dbc
@Repository
interface WeatherRepository : CoroutineCrudRepository<WeatherInfo, String>
If we check the function findById(id: Long)
, we can see that the repository provides this:
suspend fun findById(id: Long): WeatherInfo?
this is the amazing part! this function is suspending and that repository is non-blocking, but we don't have to deal with the reactive types!
No more type wrapping is necessary!
If we have a look at the findAll()
method, we can see that it is not suspending and return a reactive type:
fun findAll(): Flow<T>
To get rid of the Flow
type, we would need to collect it, otherwise, we can pass it on to the levels higher.
- If we work in a way of pages, then we can simply use
toList()
to await for the entire list - If we want to stream the data progressively, then we can pass the
flow
until the body of the serverResponse.
2. WebClients from Webflux
Most real applications don't only have a database, but also have CRUD operations over the network.
That is why we have some reactive webclients in our application.
They are written in functional style using the reactor API.
They look like this:
val forecast: WeatherApiResponse = webClient
.get()
.uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
.exchangeToMono<WeatherAPIResponse?> { it.bodyToMono() }
.doFirst {
log.info("Getting weather forecast for {}", query)
}
.doOnError {
log.error("Cannot get weather forecast for $query", it)
Exceptions.propagate(it)
}
.doOnSuccess { response ->
log.info(
"Weather forecast for query {}: {}",
query,
response
)
}
We can use the coroutine methods instead: We marked the suspending calls like this: /*S*/
val forecastResponse: ClientResponse = webClient
.get()
.uri("$host/v1/forecast.json?key=$apiKey&q=$query&days=7")
/*S*/ .awaitExchange { it }
val statusCode = forecastResponse.statusCode()
return when {
/*S*/ statusCode.is2xxSuccessful -> forecastResponse.awaitBody()
/*S*/ statusCode.is4xxClientError -> throw IllegalArgumentException(forecastResponse.createExceptionAndAwait())
/*S*/ statusCode.is5xxServerError -> throw forecastResponse.createExceptionAndAwait()
/*S*/ else -> throw forecastResponse.createExceptionAndAwait()
}
Now, we are back to a direct style, and we have much more control on the type of code that we can use.
We end up using different functions: the conversion table looks like this:
-
exchangeToMono()
->awaitExchange()
and its nullable variationawaitExchangeOrNull()
-
exchangeToFlux()
->exchangeToFlow()
-
bodyToMono()
->awaitBody()
and its nullable variationawaitBodyOrNull()
-
bodyToFlux()
->bodyToFlow()
...
As you can see, the bodies of zero or one objects can be awaited to be retrieved directly,
the flows can be awaited later on or handled one by one.
3. Service Layer
In the service layer, we can mark the functions as suspend
and change the types from their reactive types to their "unwrapped" types:
fun findById(id: Long): Mono<WeatherInfo> {}
fun findAll(): Flux<WeatherInfo> {}
fun streamAll(): Flux<WeatherInfo> {}
should all become like this:
suspend fun findById(id: Long): WeatherInfo {}
suspend fun findAll(): List<WeatherInfo> {}
suspend fun streamAll(): Flow<WeatherInfo> {}
4. Controllers and Request Handlers
If you are using the @RestController
annotation for your controller,
then whichever type you have will be handled by Spring:
but as you are calling suspending functions down the line, you need to change the controllers to be suspending too.
@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<WeatherInfo> {}
will become:
@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): WeatherInfo {}
Launching Coroutines
One important question that you might have is the following: But where do we start coroutines?
I can't see any launch {}
anywhere in the code.
The same way as calling subscribe()
yourself in reactor, you most likely don't need to.
This is because if your controller functions are suspending, then Spring
will launch them under the hood for you.
if you use the router style approach, then you can use a coRouter
instead of a normal one.
Your router will change from this:
@Bean
fun route(weatherSearchHandler: WeatherSearchHandler): RouterFunction<ServerResponse> {
return RouterFunctions
.route(
POST("/weather-infos/search/")
.and(accept(MediaType.APPLICATION_JSON)), weatherSearchHandler::searchByExample
)
}
to this:
@Bean
fun route(weatherSearchHandler: WeatherSearchHandler) = coRouter {
POST("/weather-infos/search/", weatherSearchHandler::searchByExample).apply {
accept(MediaType.APPLICATION_JSON)
}
}
When we reached this point, we finally migrated everything,
we can have a reactive application while keeping our usual writing style.
Takeaways
- Reactive libraries are interoperable because they work with the same abstractions
- Kotlin is a great language to work with, on a reactive project but not only
- Improving such projects can be done incrementally
- Coroutines allow us to keep our application reactive, but without the functional code style limitation
- While using suspending functions, we can work with normal types, and not with the reactive ones
Extra Resources:
- Kotlin channels for streaming data
- From Reactor to Coroutines by Nicolas Fränkel
- Concurrency is not Parallelism by Rob Pike
- Deep Dive into Coroutines on JVM by Roman Elizarov
- Java sockets I/O: blocking, non-blocking and asynchronous by Aliaksandr Liakh
Top comments (0)