1. 概要

Java 7 introduced the fork/join framework. It provides tools to help speed up parallel processing by attempting to use all available processor cores. It accomplishes this through a divide and conquer approach.

In practice, this means that the framework first “forks,” recursively breaking the task into smaller independent subtasks until they are simple enough to run asynchronously.

After that, the “join” part begins. The results of all subtasks are recursively joined into a single result. In the case of a task that returns void, the program simply waits until every subtask runs.

To provide effective parallel execution, the fork/join framework uses a pool of threads called the ForkJoinPool. This pool manages worker threads of type ForkJoinWorkerThread.

2. ForkJoinPool

ForkJoinPoolはフレームワークの心臓部です。 これは、 ExecutorService の実装であり、ワーカースレッドを管理し、スレッドプールの状態とパフォーマンスに関する情報を取得するためのツールを提供します。

ワーカースレッドは一度に1つのタスクしか実行できませんが、ForkJoinPoolはサブタスクごとに個別のスレッドを作成しません。 Instead, each thread in the pool has its own double-ended queue (or deque, pronounced “deck”) that stores tasks.

このアーキテクチャは、ワークスティーリングアルゴリズムの助けを借りてスレッドのワークロードのバランスを取るために不可欠です。

2.1. Work-Stealing Algorithm

Simply put, free threads try to “steal” work from deques of busy threads.

デフォルトでは、ワーカースレッドは自身の両端キューの先頭からタスクを取得します。 When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue since this is where the biggest pieces of work are likely to be located.

このアプローチにより、スレッドがタスクをめぐって競合する可能性が最小限に抑えられます。 また、利用可能な最大の作業チャンクを最初に処理するため、スレッドが作業を探しに行く必要がある回数も減ります。

2.2. ForkJoinPool インスタンス化

In Java 8, the most convenient way to get access to the instance of the ForkJoinPool is to use its static method commonPool(). This will provide a reference to the common pool, which is a default thread pool for every ForkJoinTask.

According to Oracle’s documentation, using the predefined common pool reduces resource consumption since this discourages the creation of a separate thread pool per task.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

We can achieve the same behavior in Java 7 by creating a ForkJoinPool and assigning it to a public static field of a utility class:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Now we can easily access it:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

With ForkJoinPool’s constructors, we can create a custom thread pool with a specific level of parallelism, thread factory and exception handler. Here the pool has a parallelism level of 2. This means that pool will use two processor cores.

3.3。 ForkJoinTask

ForkJoinTask is the base type for tasks executed inside ForkJoinPool. In practice, one of its two subclasses should be extended: the RecursiveAction for void tasks and the RecursiveTask for tasks that return a value. They both have an abstract method compute() in which the task’s logic is defined.

3.1. RecursiveAction

In the example below, we use a String called workload to represent the unit of work to be processed. For demonstration purposes, the task is a nonsensical one: It simply uppercases its input and logs it.

フレームワークの分岐動作を示すために、この例では、workload.length()が指定されたしきい値よりも大きい場合にタスクを分割します createSubtask()メソッドを使用します。

The String is recursively divided into substrings, creating CustomRecursiveTask instances that are based on these substrings.

As a result, the method returns a List.

リストは、 invokeAll()メソッドを使用してForkJoinPoolに送信されます。

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger = 
      Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }

    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by " 
          + Thread.currentThread().getName());
    }
}

We can use this pattern to develop our own RecursiveAction classes. To do this, we create an object that represents the total amount of work, chose a suitable threshold, define a method to divide the work and define a method to do the work.

3.2。 RecursiveTask

For tasks that return a value, the logic here is similar.

The difference is that the result for each subtask is united in a single result:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

In this example, we use an array stored in the arr field of the CustomRecursiveTask class to represent the work. The createSubtasks() method recursively divides the task into smaller pieces of work until each piece is smaller than the threshold. Then the invokeAll() method submits the subtasks to the common pool and returns a list of Future.

実行をトリガーするために、 join()メソッドがサブタスクごとに呼び出されます。

We’ve accomplished this here using Java 8’s Stream API. We use the sum() method as a representation of combining sub results into the final result.

4. ForkJoinPoolへのタスクの送信

We can use a few approaches to submit tasks to the thread pool.

Let’s start with the submit() or execute() method (their use cases are the same):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke()メソッドはタスクをフォークして結果を待ち、手動で参加する必要はありません。

int result = forkJoinPool.invoke(customRecursiveTask);

The invokeAll() method is the most convenient way to submit a sequence of ForkJoinTasks to the ForkJoinPool. It takes tasks as parameters (two tasks, var args or a collection), forks and then returns a collection of Future objects in the order in which they were produced.

Alternatively, we can use separate fork() and join() methods. fork()メソッドはタスクをプールに送信しますが、その実行はトリガーされません。 We must use the join() method for this purpose.

In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task’s execution:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

Here we used the invokeAll() method to submit a sequence of subtasks to the pool. We can do the same job with fork() and join(), though this has consequences for the ordering of the results.

To avoid confusion, it is generally a good idea to use invokeAll() method to submit more than one task to the ForkJoinPool.

5. Conclusion

Using the fork/join framework can speed up processing of large tasks, but to achieve this outcome, we should follow some guidelines:

  • Use as few thread pools as possible. In most cases, the best decision is to use one thread pool per application or system.
  • Use the default common thread pool if no specific tuning is needed.
  • Use a reasonable threshold for splitting ForkJoinTask into subtasks.
  • Avoid any blocking in ForkJoinTasks.

この記事で使用されている例は、リンクされたGitHubリポジトリで入手できます。