最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - Is there reason to use `ForkJoinPool`Executors.newWorkStealingPool()` without implementing RecursiveActionRecursiveTask?

programmeradmin3浏览0评论

In previous topic I've found out that my task is a good candidate for FJP/Executors.newWorkStealingPool()

Let me remind the code base(it is artificial code which demonstrated my real problem):

fun main(args: Array<String>) {
    val start = System.currentTimeMillis()
    Internal().doWork()
    println("Duration is ${(System.currentTimeMillis() - start)/1000} sec")
}

class Internal {

    fun doWork() {

        val pool = ThreadPoolExecutor(
        3, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        ArrayBlockingQueue(1000),
    )

        val future = CompletableFuture.supplyAsync(
            {
                // 1 subtask
                val future1 = CompletableFuture.supplyAsync(
                    {
                        (1..10).map {
                            CompletableFuture.supplyAsync(SingleExternalCall(), pool)
                        }.sumOf { it.join() }
                    },
                    pool,
                )
                // 2 subtask
                val future2 = CompletableFuture.supplyAsync(
                    {
                        (1..5).map {
                            CompletableFuture.supplyAsync(SingleExternalCall(), pool)
                        }.sumOf { it.join() }
                    },
                    pool,
                )
                // aggregate
                future1.join() + future2.join()
            },
            pool,
        )
        println(future.join())
    }

    class SingleExternalCall : Supplier<Int> {

        override fun get(): Int {
            Thread.sleep(5000)
            return counter.incrementAndGet().toInt()
        }
    }

Let me provide several arguments:

  1. In most tutorials about FJP authors implements RecursiveAction/RecursiveTask and then do smth like this:
  MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
  long mergedResult = forkJoinPool.invoke(myRecursiveTask);
  System.out.println("mergedResult = " + mergedResult);

The trick here that RecursiveTask knows how to estimate if task smal enough to execute inplace and otherwise divide for smaller the same tasks:

public class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            long workload1 = this.workLoad / 2;
            long workload2 = this.workLoad - workload1;

            MyRecursiveTask subtask1 = new MyRecursiveTask(workload1);
            MyRecursiveTask subtask2 = new MyRecursiveTask(workload2);

            subtask1.fork();
            subtask2.fork();

            long result = 0;
            result += subtask1.join();
            result += subtask2.join();
            return result;

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }
}
  1. ExecutorService.newWorkStealingPool() == ForkJoinPool
public static ExecutorService.newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

So my question is:

Is there reason to use code like this ?

    var wsp= ExecutorService.newWorkStealingPool()
    CompletableFuture.supplyAsync({
          //some code without using RecursiveAction/Task 
    }, wsp)

Will be it differ from

    var pool = ExecutorService.newWorkStealingPool()
    CompletableFuture.supplyAsync({
          //some code without using RecursiveAction/Task 
    }, cachedThreadPool)

?

Or maybe it will be better to use pool methods directly like

pool.submit(someCallable) 

?

Also I am aware that java stream api uses commonPool which is instance of FJP shared for the whole application. But them also don't have RecursiveAction/Task so I don't understand why FJP was chosen for stream api.

发布评论

评论列表(0)

  1. 暂无评论