備忘を兼ねてこんにちは
タイトルは大げさで、非同期処理に置いてArrowを使ってみても良いかもしれない 程度です。
まずは使ってみる
@Throws(Exception::class)
fun tooSlowMethod():Result = ...
Single.create<Result> { emitter ->
try {
emitter.onSuccess(tooSlowMethod())
} catch (e: Exception) {
emitter.onError(e)
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
..
}, {
..
})
これを書き直していきましょう。
まずはただcoroutine使ってみただけ。
launch(UI) {
try {
val res = withContext(DefaultDispatcher) { tooSlowMethod() }
..
} catch (e: Exception) {
..
}
}
arrow版(そしてこれはまだ未完成)
DeferredK { tooSlowMethod() } // or async { tooSlowMethod() }.k() etc...
.unsafeRunAsync{ either ->
either.fold(
..
},{
..
})
書き方は色々とありますが、とりあえずこんな感じでしょうか??
ただし次の条件で一気に弱くなります…
fun isMainThread() = Looper.myLooper() == Looper.getMainLooper()
fun successProcess(result: Result) {
if (!isMainThread()) throw RuntimeException("\(^o^)/")
Log.d("test", "success $result")
}
fun errorProcess(e: Throwable) {
if (!isMainThread()) throw RuntimeException("\(^o^)/")
Log.d("test", "error", e)
}
Single.create<Result> { emitter ->
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
successProcess(it)
}, {
errorProcess(it)
})
launch(UI) {
try {
val res = withContext(DefaultDispatcher) { tooSlowMethod() }
successProcess(res)
} catch (e: Exception) {
errorProcess(e)
}
}
// \(^o^)/
DeferredK{...}
.unsafeRunAsync({successProcess(it)}, {errorProcess(it)})
Androidあるあるですね。
/**
* 実装はunsafeAsyncRunとほぼ同じだが、Callback時のRunContextを選択できるようにしている
* どうせ今回のケースだとEitherに対してfoldする(偏見)し、そこも展開してみた
* Eitherに対してパターンマッチで…の場合もあるかもなので、お好みで
*/
fun <A> DeferredKOf<A>.unsafeRunAsyncWrapper(runContext: CoroutineContext, ifError: (Throwable) -> Unit, ifSuccess: (A) -> Unit) {
async(Unconfined, CoroutineStart.DEFAULT) {
Try { await() }.fold(
{ withContext(runContext) { ifError(it) } },
{ withContext(runContext) { ifSuccess(it) } })
}.let {
it.invokeOnCompletion { a: Throwable? ->
if (a != null) throw a
}
}
}
DeferredK {
tooSlowMethod()
}.unsafeRunAsyncWrapper(UI, { errorProcess(it) }, { successProcess(it) })
なんかすっごいRxっぽく(?)なりましたね。
むしろasyncとRxの良いとこ取りっぽく感じませんか(感じませんね)
もうちょっと使ってみる
基本的に現実は厳しいので、もうちょっと現実に即してみましょう。
fun slowMethodA(): ResultA {
if (isMainThread()) throw RuntimeException("\(^o^)/")
..
}
fun slowMethodB(): ResultB {
if (isMainThread()) throw RuntimeException("\(^o^)/")
..
}
fun makeResult(a: ResultA, b: ResultB): Result {
..
}
// DeferredKによって作られたThread上で動く
fun run1(): DeferredK<Result> {
return DeferredK {
Log.d("test", "pre a running thread is ${Thread.currentThread()}")
val a = slowMethodA()
Log.d("test", "pre b running thread is ${Thread.currentThread()}")
val b = slowMethodB()
Log.d("test", "pre makeResult running thread is ${Thread.currentThread()}")
makeResult(a, b)
}
}
// 呼び出し側で作られたThreadで始まり、DeferredKで作られたThreadに切り替わりながら動く
fun run2(): DeferredK<Result> =
ForDeferredK extensions {
binding {
Log.d("test", "pre a running thread is ${Thread.currentThread()}")
val a = DeferredK { slowMethodA() }.bind()
Log.d("test", "pre b running thread is ${Thread.currentThread()}")
val b = DeferredK { slowMethodB() }.bind()
Log.d("test", "pre makeResult running thread is ${Thread.currentThread()}")
makeResult(a, b)
}.fix()
}
// 呼び出し側がmain threadだと死ぬ
fun run3(): DeferredK<Result> =
ForDeferredK extensions {
binding {
Log.d("test", "pre a running thread is ${Thread.currentThread()}")
val a = DeferredK.just(slowMethodA()).bind()
Log.d("test", "pre b running thread is ${Thread.currentThread()}")
val b = DeferredK.just(slowMethodB()).bind()
Log.d("test", "pre makeResult running thread is ${Thread.currentThread()}")
makeResult(a, b)
}.fix()
}
// bindingの中身は呼び出し側だが、各slowMethod内はCommonPoolを利用したものになる
fun run4(): DeferredK<Result> =
ForDeferredK extensions {
binding {
Log.d("test", "pre a running thread is ${Thread.currentThread()}")
val a = { DeferredK.just(slowMethodA()) }.bindIn(CommonPool).bind()
Log.d("test", "pre b running thread is ${Thread.currentThread()}")
val b = { DeferredK.just(slowMethodB()) }.bindIn(CommonPool).bind()
Log.d("test", "pre makeResult running thread is ${Thread.currentThread()}")
makeResult(a, b)
}.fix()
}
突然のFor構文出てきましたが、あえて何も説明しません。
あれです、あれ。
なんか書けって言われたら考えますので雑に連絡ください。
さて、各々の違いは各メソッドのコメントに書いてあるとおりです。
これらの処理の共通点は
- すべて直列に動く
です
あとmakeResultによる結果も(副作用なければ)全部一緒です。
書き方によって動作するThreadが違いますので、その点をちゃんと考慮しながら…
というのが必要かと思います。
まあでも直列だけじゃなくて当然並列に動かしたいですよね…?
fun runParallels1(): DeferredK<Result> {
val a = DeferredK { slowMethodA() }
val b = DeferredK { slowMethodB() }
return a.flatMap { aa -> b.map { makeResult(aa, it) } }
}
fun runParallels2(): DeferredK<Result> =
ForDeferredK extensions {
binding {
val a: DeferredK<ResultA> = DeferredK { slowMethodA() }
val b: DeferredK<ResultB> = DeferredK { slowMethodB() }
makeResult(a.bind(), b.bind())
}.fix()
}
slowMethodA/Bがthrowを返すようなパターンだった場合のHandlingがチョットややこしくなるかと思います。
またあえて型を明示しましたが、bindしたほうがResultA/Bを直接見れて、For構文的な嬉しさを感じるかと思います。
並列動作版は呼び出しメソッドが単体で動いて且つ失敗しないようなもののときに検討すると良いんじゃないでしょうか?
(別にDeferredKに限った話ではない)
最後に
これだけだとArrowをAndroidに突っ込む動機としては弱いと思います。
ただ、
- kotlinのDeferredにmap/flatMapとかとか欲しいなー
- 非同期処理だけだとRx重厚(お前が言うな)だよなぁ…
ってチョットでも思ったなら少し触ってみる価値はあると思います。
気が向いたらEitherTransformerあたりの例でも書きます。
待ちきれない…って人は、公式Documentが良いと思います。
ていうか公式Documentが良いです。
その他Arrowのアプリ導入相談とかお受けしますので、お気軽に。(そんなやつ居るか?)
Reference
- Arrowの公式ドキュメントのkotlinxcoroutinesの項目
-
For構文のbind/bindInとか気になる人向け
- ちなみにFor構文そのものはこのMonadContinuationsが頑張っているから出来ている感じですね(雑にしか読んでないけど)
- 完全なサンプル
Top comments (2)
I didn't understand a thing! But you mentioned Arrow, so here's a 🦄
🙌
Polymorphic programs with Arrow 🚀