DEV Community

Michael Mavris
Michael Mavris

Posted on • Originally published at rockandnull.com on

Into the Flow: Kotlin cold streams primer

Into the Flow: Kotlin cold streams primer

If you are a regular user in the Kotlin community you must have heard about Kotlin Flows. At least, I've heard about them but never spend the time looking into them. But lately, when I was about to start a new Android project I decided it was finally time to look into them. Maybe everybody was talking about them for a reason.

Why Flows

We already have suspending functions. We could do async operations without blocking, by using suspend fun doSomething(): List<Something> for instance. But that would mean that we need to calculate all the results before starting to process them. Using fun doSomething(): Flow<Something> we can process the results as soon as they are ready in a non-blocking way.

It's Cold Flows actually

You might have noticed that the function returning the Flow does not have the suspend prefix. This is because it returns immediately and doesn't do any processing. The actual work happens when the flow is "collected" (or more generally speaking a terminal operator is applied on the flow), e.g. flow.collect { something -> println(something) }. Every time we call collect, the flow code will be executed again.

It's referred to as "cold", because the flow (or stream if you like) doesn't exist before collect is called and is created again every time collect is called. If the flow was there anyway and we just listened to it, then it would be "hot".

Create and terminate a Flow

The most basic way to create a Flow is using the flow { ... } builder (unexpected, right?)

fun doSomething(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

A new item in the flow can be "emitted" using emit.

Code inside the flow { ... } builder block can suspend (delay is a suspend function). But, as we said before, doSomething() is not a suspend function and doesn't block.

val something = doSomething()

viewModelScope.launch {
    something.collect { value -> println(value) }
}

On the other hand, collect is a suspend function and needs to run inside a Coroutine Scope.

Other ways to create a Flow include converting regular collections (e.g. listOf(1,2,3).asFlow()) or fixed set of items into Flows (e.g. (1..3).asFlow()).

collect is just one way to terminate a Flow. Other ways include getting the first, ensuring only a single value is emitted, reducing, folding, converting toList, etc.

Transforming Flows

Transforming is quite similar to how you would transform a regular collection. Familiar operators such as map, filter, take exist. I think there are 2 major differences from the regular collection transformation:

  • Code inside those transformation functions can suspend
  • You don't have to return a single value, you can call emit as many times as you want using transform:
(1..3).asFlow() 
    .transform { number ->
        emit(number*2)
        delay(100)
        emit(number*4) 
    }
    .collect { number -> println(number) }

Coroutine Scope

When we collect a Flow we are executing a suspend function. This means that both the code inside flow { ... } and inside collect { ... } will run on the current Coroutine Scope. But there are cases we want those 2 code blocks to run in different Coroutine Context (usually to run them in different Dispatchers, for not blocking the Main/UI thread for instance).

We shouldn't use the familiar withContext(Dispatchers.IO) in this case. Flows provide a dedicated flowOn operator for this.

fun doSomething(): Flow<Int> = flow { 
    // This runs on Dispatchers.IO
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}.flowOn(Dispatchers.IO)

[...]

viewModelScope.launch {
    doSomething().collect { value ->
        // This runs on Main thread and just prints those values
        print (value)
    } 
}    

Composing Flows

One of the most powerful things you can do with Flows it to compose them in different ways.

  • zip them, i.e. combine the 1st item of Flow A with the 1st item of Flow B , etc
val flowA = (1..3).asFlow()
val flowB = flowOf("one", "two", "three")
flowA.zip(flowB) { a, b -> "$a and $b" }
     .collect { println(it) }

// Output:
// 1 and one
// 2 and two
// 3 and three
  • combine them, i.e. every time a new value is emitted from Flow A combine it with the latest value of Flow B
val flowA = (1..3).asFlow()
val flowB = flowOf("single item")
flowA.combine(flowB) { a, b -> "$a and $b" }
     .collect { println(it) }

// Output:
// 1 and single item
// 2 and single item
// 3 and single item

Flattening Flows

Getting to the situation where a Flow transforms into another "sub"-Flow and you end up with Flow<Flow<X>> is quite common. Similarly to collections, there is a number of flat* operations.

  • flatMapConcat for waiting for each "sub"-Flow to complete before collecting (i.e. collecting Flows sequentially)
  • flatMapMerge for collecting whatever item is emitted (i.e. collecting the Flows in parallel)
  • flatMapLatest for collecting the latest "sub"-Flow that is emitted, while canceling the previous "sub"-Flow

Exceptions

Whatever exception is thrown in flow { ... } or collect { ... } can be caught with regular try - catch statements.

But there is a more declarative way, with the catch operator. Note that when you "catch" the exception, you can emit again, throw, log, etc.

doSomethingThatMightThrow()
    .catch { e -> emit("Caught $e but emitting something else") }
    .collect { value -> println(value) }

Another important thing to remember is that this operator is "upstream" only, i.e. it does not "catches" anything after it's declared.

doSomethingThatMightThrow()
    .catch { e -> emit("Caught $e but emitting something else") }
    .collect { 
        // Something gone wrong here, will not be caught
    }

My impression after looking into Flows is that they can handle more than the basic "cold stream" needs. They are not as comprehensive and powerful as other reactive libraries (e.g. RxJava) but you can always add them to your project later when needed (actually converters between Flows and other reactive libraries are provided).

For those of you wanting to read more on the subject, I would recommend the official docs and these blog posts with insights into the design of Flows (1, 2).

Top comments (0)