DEV Community

Cover image for Subscribing to concurrent property updates
Mihai Bojin
Mihai Bojin

Posted on • Originally published at mihaibojin.com on

Subscribing to concurrent property updates

Concurrency is hard. I know this is a truism, but it had to be said!

I spent some time in the past week implementing a mechanism for synchronizing multiple Props and returning them to the caller, all at once.

On the surface, this felt like an easy task.

Each Prop<T> is backed by an AtomicReference<T>, which enables the registry to safely (and concurrently) update the Prop's value.

One of the features I wanted to build in this library was to allow users to subscribe to updates. Whenever the registry receives a new value for a Prop, it should update the bound Prop and also notify any subscribers of the new value.

Since atomic references back the Props, I assumed the implementation would be straightforward:

  • create two Props and bind them to a registry
  • pass the two Props to a "Pair" implementation
  • for each of those Props, subscribe to any updates
  • the subscriber would retrieve both values and send an event containing both props

This implementation would work well when all the calls are synchronous. However, that is not an option. To increase this library's performance, I chose to offload the processing part of sending updates to Java's ForkJoinPool.

Asynchronously processing these events means there are no guarantees regarding the processing order.

Imagine the following scenario:

Events:
- set value A
- notify subscribers of value A
- set value B
- notify subscribers of value B

ForkJoinPool:
- notifies subscribers of value B
- notifies subscribers of value A

Outcome:
- the Prop's value is set to B
- but the subscribers were last sent A
Enter fullscreen mode Exit fullscreen mode

We need a mechanism for ensuring that both the Prop's value and all the subscribers receive the same value.

One option for achieving this goal would be to synchronize the two actions and only allow for a single value to be updated and one notification to be sent. This would solve the problem but would be incredibly slow. All future value updates would need to wait until the subscribers are notified. If a slow subscriber is registered, the Prop's value would not be updateable, and any code accessing the Prop would receive the old value - not great.

A better choice is to mark down the epoch at which the value was received and track the last processed epoch when notifying subscribers. When a value is updated, we'd also increment the epoch. Any ForkJoin tasks looking to send updates to subscribers will be able to check the last processed epoch and discard any expired updates.

Ensuring the implementation worked as expected took some effort and required a bit of trial and error. Here are a few lessons I learned along the way:

  • Write tests to reproduce your scenario and expected results, then run them repeatedly (think Junit + repeat until failure, tens of thousands of times)
  • Sometimes, you need extra info to debug problems; you'd be tempted to use System.out.print(), but that can be misleading; it adds a few millis of lag which can totally derail your repro when the scenario you're trying to debug executes in nanoseconds.
  • System.out.print() lies: the output stream is buffered and can be printed in the wrong order; calling System.out.flush() helps, but is not perfect and still suffers from the orders of magnitude problem explained above
  • Two independent atomic operations do not make for one atomic operation; this is counterintuitive at first but makes total sense the more you think of it; two threads executing the same two operations could order them in 4 different ways; find a way to update both values in a single atomic operation
  • Syncing multiple props means the updates need to be ordered; I found one solution: queue the update operations and pass them to AtomicReference.updateAndGet() as UnaryOperator<Pair<T, U>. This method uses weakCompareAndSetVolatile memory effects, specified in the JLS as:

... defines the happens-before relation on memory operations such as reads and writes of shared variables. The results of a write by one thread are guaranteed to be visible to a read by another thread only if the write operation happens-before the read operation.

Since we don't care about sending each and every value, we can simply trigger an event operation when an underlying Prop is updated and only pull the latest value at the time of processing. This implementation will at least ensure some order to the logged operations, but it can't guarantee how many updates the subscriber sees (it could be one, or could be each and every one). That's acceptable for our use case.

Another limitation that I had to build in was that the library needs to ensure an update is consumed before sending another update. Imagine a case where two ordered operations update multiple consumers. Due to nondeterministic thread execution, a subset of the consumers may be concurrently receiving a value while others may be receiving a different value.

This could have been avoided by having a dedicated processing thread for each Prop polling a blocking queue and then sending any updates. Apart from being slower due to locking, creating a Thread has a cost (memory usage but also in terms of the maximum number of props the system can handle at once). The ForkJoinPool does not create new threads for each task and seems like the right tool for the job since, in most cases, we want a set-and-forget type situation; we want to get an update event as fast as possible to any subscribing consumers.

Takeaways

  • don't assume ordering
  • test your code until breaking point
  • avoid implementing concurrency primitives from scratch unless there's a real need

If you liked this article and want to read more like it, please subscribe to my newsletter; I send one out every few weeks!

Top comments (0)