1. 概要

このチュートリアルでは、Javaでの作業盗みの概念を見ていきます。

2. 仕事を盗むとは何ですか?

ワークスティーリングは、マルチスレッドアプリケーションでの競合を減らすことを目的としてJavaに導入されました。 これは、フォーク/結合フレームワークを使用して行われます。

2.1. 分割統治法

フォーク/結合フレームワークでは、問題またはタスクは再帰的にサブタスクに分解されます。 次に、サブタスクが個別に解決され、サブ結果が組み合わされて結果が形成されます。

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

2.2. ワーカースレッド

分解されたタスクは、スレッドプールによって提供されるワーカースレッドの助けを借りて解決されます。 各ワーカースレッドには、担当するサブタスクがあります。 これらは両端キュー( deques )に格納されます。

各ワーカースレッドは、サブタスクを両端キューの先頭から継続的にポップすることにより、両端キューからサブタスクを取得します。 ワーカースレッドの両端キューが空の場合は、すべてのサブタスクがポップオフされて完了したことを意味します。

この時点で、ワーカースレッドは、作業を「盗む」ことができるピアスレッドプールスレッドをランダムに選択します。 次に、先入れ先出し法(FIFO)を使用して、被害者の両端キューの最後からサブタスクを取得します。

3. フォーク/結合フレームワークの実装

ForkJoinPoolクラスまたはExecutorsクラスのいずれかを使用して、ワークスティーリングスレッドプールを作成できます。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();

Executors クラスには、オーバーロードされた newWorkStealingPool メソッドがあります。このメソッドは、レベルの並列処理を表す整数引数を取ります。

Executors.newWorkStealingPool は、ForkJoinPool.commonPoolを抽象化したものです。 唯一の違いは、 Executors.newWorkStealingPool は非同期モードでプールを作成し、ForkJoinPool.commonPoolは作成しないことです。

4. 同期スレッドプールと非同期スレッドプール

ForkJoinPool.commonPool は後入れ先出し(LIFO)キュー構成を使用しますが、 Executors.newWorkStealingPool は先入れ先出し(FIFO)キュー構成を使用します。

Doug Lea によると、FIFOアプローチにはLIFOに比べて次のような利点があります。

  • 所有者とは反対側でスティーラーを操作することにより、競合を減らします
  • これは、「大規模な」タスクを早期に生成する再帰的な分割統治アルゴリズムの特性を活用します。

上記の2番目のポイントは、古い盗まれたタスクを、それを盗んだスレッドによってさらに分解できることを意味します。

Javaドキュメントによると、asyncModetrueに設定すると、結合されないイベントスタイルのタスクでの使用に適している場合があります。

5. 実例–素数を見つける

数値のコレクションから素数を見つける例を使用して、ワークスティーリングフレームワーク計算時間の利点を示します。 また、同期スレッドプールと非同期スレッドプールの使用の違いについても説明します。

5.1. 素数の問題

数のコレクションから素数を見つけることは、計算コストのかかるプロセスになる可能性があります。 これは主に、数値のコレクションのサイズによるものです。

PrimeNumbers クラスは、素数を見つけるのに役立ちます。

public class PrimeNumbers extends RecursiveAction {

    private int lowerBound;
    private int upperBound;
    private int granularity;
    static final List<Integer> GRANULARITIES
      = Arrays.asList(1, 10, 100, 1000, 10000);
    private AtomicInteger noOfPrimeNumbers;

    PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.granularity = granularity;
        this.noOfPrimeNumbers = noOfPrimeNumbers;
    }

    // other constructors and methods

    private List<PrimeNumbers> subTasks() {
        List<PrimeNumbers> subTasks = new ArrayList<>();

        for (int i = 1; i <= this.upperBound / granularity; i++) {
            int upper = i * granularity;
            int lower = (upper - granularity) + 1;
            subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
        }
        return subTasks;
    }

    @Override
    protected void compute() {
        if (((upperBound + 1) - lowerBound) > granularity) {
            ForkJoinTask.invokeAll(subTasks());
        } else {
            findPrimeNumbers();
        }
    }

    void findPrimeNumbers() {
        for (int num = lowerBound; num <= upperBound; num++) {
            if (isPrime(num)) {
                noOfPrimeNumbers.getAndIncrement();
            }
        }
    }

    public int noOfPrimeNumbers() {
        return noOfPrimeNumbers.intValue();
    }
}

このクラスについて注意すべきいくつかの重要な点:

  • RecursiveAction を拡張し、スレッドプールを使用したタスクの計算で使用されるcomputeメソッドを実装できるようにします。
  • 粒度値に基づいてタスクをサブタスクに再帰的に分割します
  • コンストラクターは、lowerおよびupperのバインドされた値を取ります。これらの値は、素数を決定する数値の範囲を制御します。
  • これにより、ワークスティーリングスレッドプールまたはシングルスレッドのいずれかを使用して素数を決定できます。

5.2. スレッドプールを使用して問題をより迅速に解決する

シングルスレッドの方法で、またワークスティーリングスレッドプールを使用して素数を決定しましょう。

まず、シングルスレッドアプローチを見てみましょう。

PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();

そして今、ForkJoinPool.commonPoolアプローチ

PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();

最後に、Executors.newWorkStealingPoolアプローチを見ていきます。

PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();

ForkJoinPoolクラスのinvokeメソッドを使用して、タスクをスレッドプールに渡します。 このメソッドは、RecursiveActionのサブクラスのインスタンスを取ります。 Java Microbench Harness を使用して、操作あたりの平均時間の観点から、これらのさまざまなアプローチを相互にベンチマークします。

# Run complete. Total time: 00:04:50

Benchmark                                                      Mode  Cnt    Score   Error  Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark           avgt   20  119.885 ± 9.917  ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark  avgt   20  119.791 ± 7.811  ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread                  avgt   20  475.964 ± 7.929  ms/op

ForkJoinPool.commonPoolとExecutors.newWorkStealingPoolの両方で、シングルスレッドアプローチよりも速く素数を決定できることは明らかです。

フォーク/結合プールフレームワークを使用すると、タスクをサブタスクに分割できます。 10,000個の整数のコレクションを、1〜100、101〜200、201〜300などのバッチに分割しました。 次に、各バッチの素数を決定し、noOfPrimeNumbersメソッドで利用可能な素数の総数を作成しました。

5.3. 計算するために仕事を盗む

同期スレッドプールを使用すると、 ForkJoinPool.commonPoolは、タスクがまだ進行中である限り、スレッドをプールに配置します。 その結果、作業の盗用のレベルはタスクのレベルに依存しません。粒度。

非同期のExecutors.newWorkStealingPoolはより管理されており、作業の盗用のレベルをタスクの粒度のレベルに依存させることができます。

ForkJoinPoolクラスのgetStealCountを使用して、作業を盗むレベルを取得します。

long steals = forkJoinPool.getStealCount();

Executors.newWorkStealingPoolForkJoinPool.commonPoolのワークスティーリングカウントを決定すると、異なる動作が発生します。

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

粒度が細かいものから粗いものに変わると(1から10,000) Executors.newWorkStealingPoolの場合、作業を盗むレベルが低下します。 したがって、タスクが分解されていない場合、スティールカウントは1になります(粒度は10,000)。

ForkJoinPool.commonPoolの動作は異なります。作業の盗用のレベルは常に高く、タスクの粒度の変化による影響はあまりありません。

技術的に言えば、素数の例は、イベントスタイルのタスクの非同期処理をサポートする例です。 これは、私たちの実装が結果の結合を強制しないためです。

Executors.newWorkStealingPool が、問題の解決にリソースを最大限に活用できる場合があります。

6. 結論

この記事では、ワークスティーリングと、fork/joinフレームワークを使用してそれを適用する方法について説明しました。 また、作業を盗む例と、それが処理時間とリソースの使用をどのように改善できるかについても見てきました。

いつものように、例の完全なソースコードは、GitHubから入手できます。