Last week, I wrote about dashboards and celebrated 2 years of Matt's Tidbits! This time, I want to share… about sharing!
Have you ever written code that looks like this?
If so, you may have run into issues - what if a particular caller of
start() forgets to call
stop(), or possibly worse - calls it more than once? These types of problems are very common, especially in multi-threaded environments.
If you add on the layer of trying to incorporate reference counting with RxJava, the problem becomes even more complex. However, I recently discovered a rather magical operator -
From the documentation:
Returns a new
Observablethat multicasts (and shares a single subscription to) the current
Observable. As long as there is at least one
Observer, the current
Observablewill stay subscribed and keep emitting signals. When all observers have disposed, the operator will dispose the subscription to the current
What this means is that we can use this operator to cause the underlying observable to be refcounted! But, it goes beyond that - your underlying code doesn't actually have to use RxJava - we can use RxJava as a mechanism for accomplishing the refcounting, which, in my opinion, provides a slightly cleaner implementation.
Here's an example of what this could look like:
The magic underlying this solution is that we're letting RxJava perform the ref-counting for us!
Walking through the code, here are the main parts:
- Clients who want to use this would call
runSomeOperationThatStartsAndStopsAutomatically(), subscribe to the
Completablethat is returned, and dispose it when they no longer want the operation to be running.
- Under the hood, our manager class checks to see if the operation is already running - and if not, starts it (in a thread-safe way).
- When the observable is created (note it is actually an Observable - RxJava does not provide the
Completabletypes), we use the
.doOnDisposeoperators to start/stop the underlying operation. What makes this work is the call to
share(), which means that these side-effect methods only run once.
- When all clients have unsubscribed, the underlying operation is stopped,
completableis cleared out, so the next client to try to use this would start the operation up again.
This mechanism allows multiple clients to use this shared functionality and ensure it's only actually running once. One benefit to this solution is that the underlying operation could conceivably throw an error and anyone subscribed would be notified so they could react to that.
Note that this approach is still susceptible to someone forgetting to unsubscribe (therefore possibly preventing the underlying operation from shutting down) - but it's not possible for a single client to unsubscribe more than once. That's one of the main things I prefer about this pattern- it reduces the possibility of programmer error by using a more expressive API.
How have you used the
share() operator in your projects? Please share in the comments below!
Interested in working with me in the awesome Digital Products team here at Accenture? We're hiring!