JavaのCyclicBarrier
1. 序章
CyclicBarriers は、java.util.concurrentパッケージの一部としてJava5で導入された同期構造です。
この記事では、並行性のシナリオでこの実装について説明します。
2. Javaの同時実行性–シンクロナイザー
java.util.concurrent パッケージには、相互に連携する一連のスレッドの管理に役立ついくつかのクラスが含まれています。 これらのいくつかは次のとおりです。
- CyclicBarrier
- Phaser
- CountDownLatch
- 交換器
- セマフォ
- SynchronousQueue
これらのクラスは、スレッド間の一般的な相互作用パターンのためのすぐに使える機能を提供します。
相互に通信し、一般的なパターンの1つに似ているスレッドのセットがある場合、のセットを使用してカスタムスキームを考え出す代わりに、適切なライブラリクラス(シンクロナイザーとも呼ばれます)を再利用できます。オブジェクトおよびsynchronizedキーワードをロックおよび条件付けします。
今後のCyclicBarrierに注目しましょう。
3. CyclicBarrier
CyclicBarrier は、スレッドのセットがバリアとも呼ばれる共通の実行ポイントに到達するのを互いに待機できるようにするシンクロナイザーです。
CyclicBarriers は、実行を続行する前に相互に共通点に到達するのを待機する必要がある固定数のスレッドがあるプログラムで使用されます。
バリアは、待機中のスレッドが解放された後に再利用できるため、サイクリックと呼ばれます。
4. 使用法
CyclicBarrierのコンストラクターは単純です。 共通の実行ポイントに到達したことを示すために、バリアインスタンスで await()メソッドを呼び出す必要があるスレッドの数を示す単一の整数を取ります。
public CyclicBarrier(int parties)
実行を同期する必要があるスレッドはpartiesとも呼ばれ、 await()メソッドを呼び出すことで、特定のスレッドがバリアポイントに到達したことを登録できます。
この呼び出しは同期的であり、このメソッドを呼び出すスレッドは、指定された数のスレッドがバリアで同じメソッドを呼び出すまで実行を一時停止します。 必要な数のスレッドがawait()を呼び出したこの状況は、バリアのトリップと呼ばれます。
オプションで、2番目の引数をコンストラクターに渡すことができます。これはRunnableインスタンスです。 これには、バリアをトリップする最後のスレッドによって実行されるロジックがあります。
public CyclicBarrier(int parties, Runnable barrierAction)
5. 実装
CyclicBarrier の動作を確認するために、次のシナリオを考えてみましょう。
固定数のスレッドが実行し、対応する結果をリストに格納する操作があります。 すべてのスレッドがアクションの実行を終了すると、そのうちの1つ(通常はバリアを通過する最後のスレッド)が、これらの各スレッドによってフェッチされたデータの処理を開始します。
すべてのアクションが発生するメインクラスを実装しましょう。
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults
= Collections.synchronizedList(new ArrayList<>());
private Random random = new Random();
private int NUM_PARTIAL_RESULTS;
private int NUM_WORKERS;
// ...
}
このクラスは非常に単純です。NUM_WORKERSは実行されるスレッドの数であり、NUM_PARTIAL_RESULTSは各ワーカースレッドが生成する結果の数です。
最後に、これらの各ワーカースレッドの結果を格納するリストであるpartialResultsがあります。 このリストはSynchronizedListであることに注意してください。これは、複数のスレッドが同時に書き込みを行うためです。また、 add()メソッドはプレーンなではスレッドセーフではありません。 ]ArrayList。
次に、各ワーカースレッドのロジックを実装しましょう。
public class CyclicBarrierDemo {
// ...
class NumberCruncherThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// Crunch some numbers and store the partial result
for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}
}
次に、バリアが作動したときに実行されるロジックを実装します。
簡単にするために、部分的な結果リストにすべての数値を追加してみましょう。
public class CyclicBarrierDemo {
// ...
class AggregatorThread implements Runnable {
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of " + NUM_WORKERS
+ " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
}
}
}
最後のステップは、 CyclicBarrier を構築し、 main()メソッドで開始することです。
public class CyclicBarrierDemo {
// Previous code
public void runSimulation(int numWorkers, int numberOfPartialResults) {
NUM_PARTIAL_RESULTS = numberOfPartialResults;
NUM_WORKERS = numWorkers;
cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
System.out.println("Spawning " + NUM_WORKERS
+ " worker threads to compute "
+ NUM_PARTIAL_RESULTS + " partial results each");
for (int i = 0; i < NUM_WORKERS; i++) {
Thread worker = new Thread(new NumberCruncherThread());
worker.setName("Thread " + i);
worker.start();
}
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.runSimulation(5, 3);
}
}
上記のコードでは、サイクリックバリアを5つのスレッドで初期化し、それぞれが計算の一部として3つの整数を生成し、結果のリストに同じものを格納します。
バリアがトリップすると、バリアをトリップした最後のスレッドがAggregatorThreadで指定されたロジックを実行します。つまり、スレッドによって生成されたすべての数値を加算します。
6. 結果
上記のプログラムの1回の実行からの出力は次のとおりです。スレッドは異なる順序で生成される可能性があるため、実行ごとに異なる結果が生成される可能性があります。
Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2
Adding 2 0 5
Adding 6 4 0
Adding 1 1 0
Adding 9 3 5
Thread 4: Final result = 46
上記の出力が示すように、スレッド4 はバリアをトリップし、最終的な集約ロジックも実行します。 また、上記の例に示すように、スレッドが開始された順序で実際に実行される必要はありません。
7. 結論
この記事では、 CyclicBarrier とは何か、そしてそれがどのような状況で役立つかを見てきました。
また、他のプログラムロジックを続行する前に、固定の実行ポイントに到達するために固定数のスレッドが必要なシナリオを実装しました。
いつものように、チュートリアルのコードはGitHubのにあります。