A recent client engagement involved interop between Kotlin coroutines in shared code, and RxSwift on the iOS side. We did some work to ensure that this could be done in a way that is type-safe and thread-safe. Whether or not you use RxSwift, hopefully this can provide some useful patterns for interop code.
Coroutines are a Kotlin language feature that allows asynchronous code to be written in a way that looks like synchronous code, avoiding the nesting that often comes with callback-based APIs. They're available on all Kotlin platforms, but have some limitations on the native side. The release version of native coroutines is limited to single-threaded use-cases, though there are experimental releases available that are multithreaded with some risk of memory leaks. But while they're a fully-supported language feature of Kotlin, they don't translate to Objective-C and Swift.
RxSwift is a Swift implementation of the Reactive Streams specification. It's one way to handle asynchronous code on Swift, and has many operators for combining and transforming event streams. Though some of the names are different, much of the API will feel familiar to Kotlin developers who have experience with RxJava.
Since Coroutines will almost always be present in shared code, and RxSwift is a common option on the iOS side, hopefully the motivation to communicate between them is clear. So letโs start writing some code.
Common Repository
Weโll work with a dummy repository class that looks like this, defined in src/commonMain
class ThingRepository {
suspend fun getThing(succeed: Boolean): Thing {
delay(100)
if (succeed) {
return Thing(0)
} else {
error("oh no!")
}
}
fun getThingStream(count: Int, succeed: Boolean): Flow<Thing> = flow {
repeat(count) {
delay(100)
emit(Thing(it))
}
if (!succeed) error("oops!")
}
}
From the outside this looks roughly like a real repository might, but with inputs that let us control the output a bit more directly for demonstration purposes. It gives us the ability to test success and error cases for both single-event suspend
functions, and multiple-event Flow
s. The internal delays also give us a chance to cancel things in-flight so we can verify that works as expected.
Of course, if we try to consume this repository from Swift code, we run into some immediate problems. Suspend functions are not visible at all to Swift*, and while the Flow
type is visible, most APIs you might call on it are not. Further, Flow
's generic type is not visible to Swift, due to limitations in how generics are translated from Kotlin, though Objective-C, and then to Swift. Generic arguments only make it from Kotlin to Swift when defined on classes, and Flow
is an interface.
*This will be changing somewhat in Kotlin 1.4, where suspend functions will generate a callback function visible to Objective-C/Swift. However, this uses only the language-level suspend
function support, and none of the extra functionality from the kotlinx.coroutines
library, so it doesn't help with cancellation or the structured concurrency model. We want to be able to control cancellation and threading via CoroutineScope
s, so the new stuff in 1.4 won't help here.
Suspend and Flow wrappers
So, let's add some wrapper types that are easier to interact with from Swift. These are iOS-specific, so we can put them in the iosMain
source-set. First, SuspendWrapper
:
class SuspendWrapper<T>(private val suspender: suspend () -> T) {
fun subscribe(
scope: CoroutineScope,
onSuccess: (item: T) -> Unit,
onThrow: (error: Throwable) -> Unit
): Job = scope.launch {
try {
onSuccess(suspender())
} catch (error: Throwable) {
onThrow(error)
}
}
}
A couple things to highlight here:
- The
subscribe
function is a stand-in for invoking the lambda since that's impossible from Swift, but it also adds some callbacks. This lets us easily hook into the different Rx event callbacks. If we care about other parts of the Rx lifecycle like subscription and disposal we could add other lambdas here and call them at the appropriate time. - There's a
scope
parameter, which I left in to keep this architecture-agnostic. You could alternatively make the scope internal to the wrapper, if you want to assert that you always call this suspend function in a specific scope. This would be less flexible but would avoid some work on the Swift side. - The generic parameter has an
Any
upper-bound. Without this, all Swift references toT
would be nullable, even if a nonnull type were used forT
. If you need to work with nullable types, I recommend having a separateNullableSuspendWrapper
which differs only in the generic upper-bound.
But there's still an issue. There's a good chance we'll want to talk to these wrappers in a multithreaded setting. In the default, single-threaded coroutines version, this would require the success and throw callbacks to switch back to the original thread before running, which is a heavy limitation. Things are easier if we use the multithreaded coroutines branch, but we still need to do some work to make this thread-safe for Kotlin/Native.
class SuspendWrapper<T>(private val suspender: suspend () -> T) {
init {
freeze()
}
fun subscribe(
scope: CoroutineScope,
onSuccess: (item: T) -> Unit,
onThrow: (error: Throwable) -> Unit
): Job = scope.launch {
try {
onSuccess(suspender().freeze())
} catch (error: Throwable) {
onThrow(error.freeze())
}
}.freeze()
}
We've added freeze()
calls in three key places:
- On initialization, so the
SuspendWrapper
itself can cross threads - On the arguments of each callback, so that emissions or errors can be passed to other threads.
- On the
Job
returned by thesubscribe()
call, so that it can be cancelled from any thread.
The Flow
equivalent is this:
class FlowWrapper<T>(private val flow: Flow<T>) {
init {
freeze()
}
fun subscribe(
scope: CoroutineScope,
onEach: (item: T) -> Unit,
onComplete: () -> Unit,
onThrow: (error: Throwable) -> Unit
): Job = flow
.onEach { onEach(it.freeze()) }
.catch { onThrow(it.freeze()) }
.onCompletion { onComplete() }
.launchIn(scope)
.freeze()
}
This is similar to the structure of SuspendWrapper
, except that it wraps a Flow
instead of a suspend
function. An important but subtle detail is that the catch{}
call must come before onCompletion{}
, or else the RxSwift
stream will end before the error is emitted.
iOS Repository Wrapper
With these function-level wrappers in place, we'll add an iOS wrapper around our ThingRepository
that makes use of them:
class ThingRepositoryIos(private val repository: ThingRepository) {
val scope: CoroutineScope = object : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = SupervisorJob() + Dispatchers.Default
}
init {
freeze()
}
fun getThingWrapper(succeed: Boolean) =
SuspendWrapper { repository.getThing(succeed) }
fun getThingStreamWrapper(count: Int, succeed: Boolean) =
FlowWrapper(repository.getThingStream(count, succeed))
}
This piece is the unfortunate boilerplate we need in this scheme. While most of the other bridge code just has to be written once, each repository method must be explicitly wrapped into something visible from Swift, and we wrap the entire repository so we have a place to put them. At scale it would probably be preferable to codegen some version of this, but for something small it's pretty straightforward to write by hand.
Note that we freeze this class to ensure it can safely be touched from a background thread, since it likely will be.
We also declare a scope here. This will be passed as a parameter to the subscribe()
calls on SuspendWrapper
and FlowWrapper
. In this example it's using Dispatchers.Default
, which requires using the multithreaded coroutines branch.
Connecting Wrappers to Rx
Armed with our iOS-compatible repository, it's time to move over into Swift. My natural inclination as a Kotlin developer is to add extension functions to SuspendWrapper
and FlowWrapper
which connect the relevant Rx piping to the callback lambdas inside the subscribe()
function. Unfortunately, the following declaration isn't valid here:
extension SuspendWrapper {
func toSingle() -> Single<T> {
...
}
}
This leads to an error stating Extension of a generic Objective-C class cannot access the class's generic parameters at runtime
. I don't have a deep understanding here, but essentially due to the different generic models between Swift and Objective-C, the generic parameter on SuspendWrapper
is erased at runtime and so the extension can't be resolved. Luckily, we can still define top-level functions:
func createSingle<T>(
scope: Kotlinx_coroutines_coreCoroutineScope,
suspendWrapper: SuspendWrapper<T>
) -> Single<T> {
return Single<T>.create { single in
let job: Kotlinx_coroutines_coreJob = suspendWrapper.subscribe(
scope: scope,
onSuccess: { item in single(.success(item)) },
onThrow: { error in single(.error(KotlinError(error))) }
)
return Disposables.create { job.cancel(cause: nil) }
}
}
func createObservable<T>(
scope: Kotlinx_coroutines_coreCoroutineScope,
flowWrapper: FlowWrapper<T>
) -> Observable<T> {
return Observable<T>.create { observer in
let job: Kotlinx_coroutines_coreJob = flowWrapper.subscribe(
scope: scope,
onEach: { item in observer.on(.next(item)) },
onComplete: { observer.on(.completed) },
onThrow: { error in observer.on(.error(KotlinError(error))) }
)
return Disposables.create { job.cancel(cause: nil) }
}
}
These functions take in a CoroutineScope
and a SuspendWrapper
or FlowWrapper
, and create the associated Rx type Single
or Observable
. Rx events get forwarded to the callback lambdas that the wrappers defined, and the Rx disposable is linked to the coroutine Job
so that cancellation on the Rx side shuts down the coroutine as well.
By the way, a note on errors
You might note the KotlinError
class referenced here. This is a simple wrapper to pass Kotlin exceptions into the Rx error stream, and forward the error message.
class KotlinError: LocalizedError {
let throwable: KotlinThrowable
init(_ throwable: KotlinThrowable) {
self.throwable = throwable
}
var errorDescription: String? {
get { throwable.message }
}
}
This lets us read out the Kotlin error message from Swift by reading the localizedDescription
field on the error, without needing to check its type. This helps avoid extra Kotlin-specific error-handling in Swift.
Usage From Elsewhere in iOS App
The rest of the Swift code can now talk to these observables, like so:
let disposable = createObservable(
scope: repository.scope,
flowWrapper: repository.getThingStreamWrapper(count: 3, succeed: true)
).subscribe(
onNext: { thing in NSLog("next: \(thing)") },
onError: { error in NSLog("error: \(error.localizedDescription)") },
onCompleted: { NSLog("complete") },
onDisposed: { NSLog("disposed") }
)
This will print
next: Thing(count=0)
next: Thing(count=1)
next: Thing(count=2)
complete!
disposed!
So there you have it: type-safe and thread-safe communication from Swift with coroutine APIs defined in our shared Kotlin, via RxSwift.
Next Steps
I didn't emphasize it very heavily above, but only the Swift side of this setup has any knowledge of RxSwift. So if you want to do something similar with some other reactive API (I hear Combine is interesting), you could probably do so without needing to change anything on the Kotlin side.
If you'd like to explore further, you can find the code at the following repo:
Note: The repository has since been updated for the sequel to this blog post, but you can find the state of the code when this post was originally written in the
v1
branch
This includes a few extensions on what was presented here:
- Both nullable and nonnull versions of all Kotlin wrapper types and Swift wrapper functions
- Swift unit tests to verify everything is working
- An extra
jobCallback
parameter inwrapSingle()
and its siblings so that the Swift tests can pull the internally-createdJob
out and verify it gets cancelled when expected.
Hopefully this tour of RxSwift and Coroutines interop was interesting and helpful to you. And if you'd like to go deeper, you can always reach out to Touchlab!
Top comments (0)