Java Executors are flexible and convenient for vast majority of use cases. Especially well they are suited for traditional threading model design, where application executes rather long running (or even blocking) tasks. Situation changes when there is a need to handle huge number of tiny tasks. Java Platform developers met this issue as well and designed ForkJoinPool exactly for this use case. And ForkJoinPool performs very well.
But sometimes even this is not enough. Due to nature of the use case the scheduler must have as small overhead as possible. ForkJoinPool implements quite sophisticated "work stealing" mechanism and it has it's own price.
Analyzing ForkJoinPool Design
To analyze where issues may come from let's take a look at the ForkJoinPool internals (picture is taken from this article):
So, each worker thread has its own task queue. Also there is a common task queue shared between threads. Each worker processes tasks from it's own queue. When there are no tasks, it looks for tasks in other workers' queues. And when they are empty too, worker starts picking tasks from the shared queue.
The ForkJoinPool design is quite good for parallel stream processing and asynchronous processing with CompletableFuture. Unfortunately this design has few issues for case when there is a huge amount of tiny tasks. Presence of shared incoming queue starts to be a congestion point. To utilize full CPU power, ForkJoinPool usually uses as many workers as there are CPU cores (minus one, see below). As number of cores increases (the case of modern CPU's), issue gets worse. Similar thing happens when workers trying to "steal" work from other thread.
It worth to note that similar issue exists with all implementations of Executors present in Java Standard Library.
Finding Right Solution
Simplest way to avoid single congestion point is to use separate queue per worker and submit tasks directly to worker queue. Simple round robin queue selection mechanism while can't prevent contention completely works quite well and distributes incoming tasks rather good.
My initial approach looked like this:
Unfortunately this approach has one inherent limitation: thread which submits task and worker thread need to access same queue, so queue must be one of the Concurrent queues. And while contention is much lower with several queues, it still exists. Concurrent queues are heavily optimized and very fast, but still have overhead related to concurrent access.
To minimize impact caused by concurrent access, I've changed approach. Instead of using single queue per worker, I've used two queues. One, called input queue, received incoming tasks. Second, called worker queue, were processed by worker thread. Once worker queue is emptied, worker atomically swaps input queue and work queue and starts processing new work queue.
This approach somewhat improved performance, as there almost no time when more than one threads access each queue and work queue is exclusively accessed by worker. Nevertheless, difference was rather small, as both queues need to be concurrent with all relevant overhead.
Replacing Queue With ... Stack!
While last approach didn't provide significant performance improvement, it allowed to look on the processing at different angle: queues are not used as queues anymore. Instead input queue is just a temporary storage for incoming tasks. But such a storage shouldn't be a queue, so it's possible to use simpler data structure with lower concurrent access overhead. It appeared that such a structure already exists (although is not a part of Java Standard Library). The structure is called Treiber Stack and has very low concurrent access overhead since adding element to stack is just one Compare-And-Swap operation. But there is one additional advantage: internally data stored in simple linked list which can be accessed directly, without any concurrency mechanisms. Of course only if we can guarantee that access is performed within single thread at a time.
The structure has one disadvantage though - since this is stack and data accessed in Last-In-First-Out (LIFO) order, tasks get processed in reverse order in comparison to submission. This might not be an issue, but might result to some unfairness in regard to tasks added first. Fortunately this can be fixed easily as reversing linked list is very simple and can be done in place.
Final structure looks so:
public class StackingCollector<T> {
private final AtomicReference<Node<T>> head = new AtomicReference<>();
private StackingCollector() {}
public static <T> StackingCollector<T> stackingCollector() {
return new StackingCollector<>();
}
public void push(final T action) {
final var newHead = new Node<>(action);
Node<T> oldHead;
do {
oldHead = head.get();
newHead.nextNode = oldHead;
} while (!head.compareAndSet(oldHead, newHead));
}
public boolean swapAndApplyFIFO(final Consumer<T> consumer) {
//Note: this is very performance critical method, so all internals are inlined
Node<T> head;
//Detach stored data from head
do {
head = this.head.get();
} while (!this.head.compareAndSet(head, null));
//Reverse list
Node<T> current = head;
Node<T> prev = null;
Node<T> next = null;
while(current != null) {
next = current.nextNode;
current.nextNode = prev;
prev = current;
current = next;
}
final var hasElements = prev != null;
//Process elements
while (prev != null) {
consumer.accept(prev.element);
prev = prev.nextNode;
}
return hasElements;
}
static final class Node<T> {
public T element;
public Node<T> nextNode;
public Node(final T element) {
this.element = element;
}
}
}
The usage is very simple: push data via push method and from time to time call swapAndApplyFIFO() which processes already collected elements. Note that push and swapAndApplyFIFO() do not contend once head is detached, so new data can be pushed while previously collected data processed.
The StackingCollector replaces both working queues in the executor and whole architecture now looks so:
This is mostly similar to initial diagram with queue per worker except there is no simultaneous access to data. Once
swapAndApplyFIFO() replaces stack head, it gets exclusive access to collected data, so it can process elements without any synchronization overhead.
Conclusion
The new Executor implementation provided quite low overhead and easily handles large amounts of small tasks.
The Executor code can be found here.
Top comments (0)