DEV Community

Daniele Bottillo
Daniele Bottillo

Posted on

RxJava, a story about delay and schedulers

RxJava is a beast. It has a very high learning curve but it lets you do complicated things in very few lines of code which comes with a price: you need to understand how the internal works otherwise it will bite you very hard!

Recently I found a very strange behaviour with the delay operator.

Let’s assume to have an observable that we want to delay before it finishes:

Single.fromCallable {
   Thread.sleep(DELAY)
   "done!"
} 
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe {
   view.showText("started!")
}
.doOnSuccess {
   view.showText(it)
}.subscribe()
Enter fullscreen mode Exit fullscreen mode

Nothing strange here, we have a callable that will wait for a delay before returning the “done!” string. 

But we can write a better version with the delay operator:

Single.fromCallable {
   "done!"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.delay(DELAY, TimeUnit.MILLISECONDS)
.doOnSubscribe {
   view.showText("started!")
}.doOnSuccess {
   view.showText(it)
}
.subscribe()
Enter fullscreen mode Exit fullscreen mode

Seems legit, we have removed that ugly Thread.sleep() inside fromCallable and now we use the delay operator, nice!

But if you run that code it will crash badly:

io.reactivex.exceptions.OnErrorNotImplementedException: 
Only the original thread that created a view hierarchy can touch its views.
Enter fullscreen mode Exit fullscreen mode

Wait what? it seems that is trying to update the view in the wrong thread! I usually check the documentation in this case but it’s not very helpful:

http://reactivex.io/documentation/operators/delay.html 

It doesn’t mention anything about threads. But what you can do is to go inside the implementation!

So if you follow the delay operator from code you will get to:

/**
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
*
* Scheduler:
* {@code delay} operates by default on the {@code computation} {@link Scheduler}.
* 
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single delay(long time, TimeUnit unit) {
    return delay(time, unit, Schedulers.computation(), false);
}
Enter fullscreen mode Exit fullscreen mode

Wait, it says that by default it operates on the computation scheduler! That’s it, delay is changing the scheduler back to computation so we are trying to update the view from the wrong thread. 

Luckily there is another method that we can use:

public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler)
Enter fullscreen mode Exit fullscreen mode

Perfect, we can rewrite our code using this alternative delay operator:

Single.fromCallable {
   "done!"
}
.subscribeOn(Schedulers.io())
.delay(DELAY, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.doOnSubscribe {
   view.showText("started!")
}
.doOnSuccess {
   view.showText(it)
}
.subscribe()
Enter fullscreen mode Exit fullscreen mode

You can notice that now you don’t need to specify ObserveOn anymore because we are choosing the thread in the delay operator.

You can find a working example on my github: https://github.com/dbottillo/Blog/blob/rxjava_delay

Happy Rxjava!

Oldest comments (0)