In previous topic I've found out that my task is a good candidate for FJP/Executors.newWorkStealingPool()
Let me remind the code base(it is artificial code which demonstrated my real problem):
fun main(args: Array<String>) {
val start = System.currentTimeMillis()
Internal().doWork()
println("Duration is ${(System.currentTimeMillis() - start)/1000} sec")
}
class Internal {
fun doWork() {
val pool = ThreadPoolExecutor(
3, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
ArrayBlockingQueue(1000),
)
val future = CompletableFuture.supplyAsync(
{
// 1 subtask
val future1 = CompletableFuture.supplyAsync(
{
(1..10).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// 2 subtask
val future2 = CompletableFuture.supplyAsync(
{
(1..5).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// aggregate
future1.join() + future2.join()
},
pool,
)
println(future.join())
}
class SingleExternalCall : Supplier<Int> {
override fun get(): Int {
Thread.sleep(5000)
return counter.incrementAndGet().toInt()
}
}
Let me provide several arguments:
- In most tutorials about FJP authors
implements RecursiveAction/RecursiveTask
and then do smth like this:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);
The trick here that RecursiveTask
knows how to estimate if task smal enough to execute inplace and otherwise divide for smaller the same tasks:
public class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
long workload1 = this.workLoad / 2;
long workload2 = this.workLoad - workload1;
MyRecursiveTask subtask1 = new MyRecursiveTask(workload1);
MyRecursiveTask subtask2 = new MyRecursiveTask(workload2);
subtask1.fork();
subtask2.fork();
long result = 0;
result += subtask1.join();
result += subtask2.join();
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
}
- ExecutorService.newWorkStealingPool() == ForkJoinPool
public static ExecutorService.newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
So my question is:
Is there reason to use code like this ?
var wsp= ExecutorService.newWorkStealingPool()
CompletableFuture.supplyAsync({
//some code without using RecursiveAction/Task
}, wsp)
Will be it differ from
var pool = ExecutorService.newWorkStealingPool()
CompletableFuture.supplyAsync({
//some code without using RecursiveAction/Task
}, cachedThreadPool)
?
Or maybe it will be better to use pool methods directly like
pool.submit(someCallable)
?
Also I am aware that java stream api uses commonPool which is instance of FJP shared for the whole application. But them also don't have RecursiveAction/Task
so I don't understand why FJP was chosen for stream api.