The combination of RecursiveTask
implementations running in a ForkJoinPool
allows tasks to be defined that may spawn subtasks that in turn may run asynchronously. The ForkJoinPool
manages efficient processing of those tasks.
This article presents a simple calculator application to evaluate a formula defined as a List
presents a single-threaded recursive solution, and then converts that solution to use RecursiveTasks
executed in a ForkJoinPool
.
Recursive Solution
A formula to be computed is defined as follows:
public enum Operator { ADD, MULTIPLY; }
public static List<?> FORMULA =
List.of(Operator.ADD,
List.of(Operator.MULTIPLY, 1, 2, 3), 4, 5,
List.of(Operator.ADD, 6, 7, 8,
List.of(Operator.MULTIPLY, 9, 10, 11)));
A formula is either a Number
or a List
consisting of an Operator
followed by other formulae. For illustration, the Lisp equivalent of FORMULA
would be:
(+ (* 1 2 3) 4 5
(+ 6 7 8
(* 9 10 11)))
A Task
class is defined to solve formulae:
public static class Task {
private final Object formula;
public Task(Object formula) { this.formula = formula; }
public Integer compute() {
Integer result = null;
if (formula instanceof Number) {
result = ((Number) formula).intValue();
} else {
List<?> list = (List<?>) formula;
Operator operator = (Operator) list.get(0);
List<Task> subtasks =
list.subList(1, list.size())
.stream()
.map(Task::new)
.collect(toList());
IntStream operands =
subtasks.stream()
.map(Task::compute)
.mapToInt(Integer::intValue);
switch (operator) {
case ADD:
result = operands.sum();
break;
case MULTIPLY:
result = operands.reduce(1, (x, y) -> x * y);
break;
}
}
System.out.println(formula + " -> " + result);
return result;
}
}
The compute()
method evaluates the formula by creating another Task
instance to evaluate each operand recursively by calling compute()
. The actual mechanics are to create a List
of Task
s and then map the Stream
of Task
s to an IntStream
by calling Task.compute()
which is evaluated based on the operator.
The static main(String[])
function simply instantiates a Task
and calls compute()
.
public static void main(String[] argv) {
Task task = new Task(FORMULA);
System.out.println("Result: " + task.compute());
}
Which generates the following output:
1 -> 1
2 -> 2
3 -> 3
[MULTIPLY, 1, 2, 3] -> 6
4 -> 4
5 -> 5
6 -> 6
7 -> 7
8 -> 8
9 -> 9
10 -> 10
11 -> 11
[MULTIPLY, 9, 10, 11] -> 990
[ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
[ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
The next chapter details how to convert this solution to use ForkJoinPool
to enable some level of parallel processing.
ForkJoinPool Solution
The Task
class is modified to extend RecursiveTask<Integer>
. When a subtask is instantiated, RecursiveTask.fork()
is called to asynchronously execute this task in the pool the current task is running in. In the IntStream
, RecursiveTask.join()
is called to wait for the subtask to complete (if it hasn't already) and return the result of the compute()
method.
public static class Task extends RecursiveTask<Integer> {
...
@Override
public Integer compute() {
Integer result = null;
if (formula instanceof Number) {
...
} else {
...
List<Task> subtasks =
list.subList(1, list.size())
.stream()
.map(Task::new)
.peek(RecursiveTask::fork)
.collect(toList());
IntStream operands =
subtasks.stream()
.map(RecursiveTask::join)
.mapToInt(Integer::intValue);
...
}
System.out.println(Thread.currentThread() + "\t"
+ formula + " -> " + result);
return result;
}
}
The executing Thread
is included in the output to demonstrate the behavior. The main(String[])
function creates a ForkJoinPool
, the Task
to compute FORMULA
, and uses the pool to invoke the Task
. The result is obtained through Task.join()
to wait for the computation to complete.
public static int N = 10;
public static void main(String[] argv) {
ForkJoinPool pool = new ForkJoinPool(N);
RecursiveTask<Integer> task = new Task(FORMULA);
pool.invoke(task);
System.out.println("Result: " + task.join());
}
Output using 10 threads (N = 10
):
Thread[ForkJoinPool-1-worker-31,5,main] 2 -> 2
Thread[ForkJoinPool-1-worker-19,5,main] 8 -> 8
Thread[ForkJoinPool-1-worker-23,5,main] 4 -> 4
Thread[ForkJoinPool-1-worker-17,5,main] 3 -> 3
Thread[ForkJoinPool-1-worker-9,5,main] 1 -> 1
Thread[ForkJoinPool-1-worker-27,5,main] 5 -> 5
Thread[ForkJoinPool-1-worker-3,5,main] 7 -> 7
Thread[ForkJoinPool-1-worker-21,5,main] 6 -> 6
Thread[ForkJoinPool-1-worker-17,5,main] 11 -> 11
Thread[ForkJoinPool-1-worker-23,5,main] 10 -> 10
Thread[ForkJoinPool-1-worker-31,5,main] 9 -> 9
Thread[ForkJoinPool-1-worker-5,5,main] [MULTIPLY, 1, 2, 3] -> 6
Thread[ForkJoinPool-1-worker-31,5,main] [MULTIPLY, 9, 10, 11] -> 990
Thread[ForkJoinPool-1-worker-13,5,main] [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
Thread[ForkJoinPool-1-worker-19,5,main] [ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
And with 1 pool thread (N = 1
):
Thread[ForkJoinPool-1-worker-3,5,main] 1 -> 1
Thread[ForkJoinPool-1-worker-3,5,main] 2 -> 2
Thread[ForkJoinPool-1-worker-3,5,main] 3 -> 3
Thread[ForkJoinPool-1-worker-3,5,main] [MULTIPLY, 1, 2, 3] -> 6
Thread[ForkJoinPool-1-worker-3,5,main] 4 -> 4
Thread[ForkJoinPool-1-worker-3,5,main] 5 -> 5
Thread[ForkJoinPool-1-worker-3,5,main] 6 -> 6
Thread[ForkJoinPool-1-worker-3,5,main] 7 -> 7
Thread[ForkJoinPool-1-worker-3,5,main] 8 -> 8
Thread[ForkJoinPool-1-worker-3,5,main] 9 -> 9
Thread[ForkJoinPool-1-worker-3,5,main] 10 -> 10
Thread[ForkJoinPool-1-worker-3,5,main] 11 -> 11
Thread[ForkJoinPool-1-worker-3,5,main] [MULTIPLY, 9, 10, 11] -> 990
Thread[ForkJoinPool-1-worker-3,5,main] [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
Thread[ForkJoinPool-1-worker-3,5,main] [ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
Which (unsurprisingly) calculates the formulae in the same order as the recursive solution.
Summary
Single threaded recursive solutions may be converted to RecursiveTask
1 implementations and invoked through a ForkJoinPool
to enable efficient processing of subtasks.
[1] Implementation class of ForkJoinTask
. ↩
Top comments (0)