JavaのCyclicBarrier
1前書き
CyclicBarriers
は、Java 5で
java.util.concurrent
パッケージの一部として導入された同期コンストラクトです。
この記事では、並行処理のシナリオでこの実装を検討します。
2 Java並行性 – シンクロナイザ
java.util.concurrent
パッケージには、互いに連携し合う一連のスレッドの管理に役立ついくつかのクラスが含まれています。これらのいくつかは次のとおりです。
-
CyclicBarrier
-
フェイザー
-
CountDownLatch
-
交換者
-
スマホア
-
SynchronousQueue
これらのクラスは、スレッド間の一般的な対話パターンのための独創的な機能を提供します。互いに通信し、1つ以上のより一般的なものに従う一連のスレッドがある場合
互いに通信し、共通のパターンの1つに似た一連のスレッドがある場合は、一連のロックを使用してカスタムスキームを作成する代わりに、適切なライブラリクラス(
Synchronizers
とも呼ばれる)を再利用できます。条件オブジェクト** と
synchronized
キーワード。
今後の
CyclicBarrier
に焦点を当てましょう。
3
CyclicBarrier
CyclicBarrier
は、
barrier
とも呼ばれる、共通の実行ポイントに到達するためにスレッドのセットが互いに待機することを可能にするシンクロナイザです。
CyclicBarriers
は、実行を継続する前に、お互いが共通点に達するのを待たなければならない固定数のスレッドがあるプログラムで使用されます。
バリアは待機スレッドが解放された後に再利用できるため、
cyclic
と呼ばれます。===
4使用法
CyclicBarrier
のコンストラクタは簡単です。これは、共通の実行ポイントに到達したことを示すために、バリアインスタンスで
await()
メソッドを呼び出す必要があるスレッドの数を示す単一の整数を取ります。public CyclicBarrier(int parties)
実行を同期させる必要があるスレッドは
parties
とも呼ばれ、
await()
メソッドを呼び出すことで、特定のスレッドがバリアポイントに到達したことを登録できます。この呼び出しは同期的であり、このメソッドを呼び出すスレッドは、指定された数のスレッドがバリアで同じメソッドを呼び出すまで実行を中断します。 ** 必要な数のスレッドが
await()
を呼び出しているこの状況は、
バリアの除去
と呼ばれています。必要に応じて、コンストラクタに2番目の引数を渡すことができます。これは
Runnable
インスタンスです。これには、バリアを通過する最後のスレッドによって実行されるロジックがあります。public CyclicBarrier(int parties, Runnable barrierAction)
===
5実装
CyclicBarrier
の動作を確認するために、次のシナリオを考えましょう。固定数のスレッドが実行し、対応する結果をリストに格納する操作があります。すべてのスレッドがそのアクションの実行を終了すると、それらのうちの1つ(通常はバリアを通過する最後のスレッド)が、これらのそれぞれによってフェッチされたデータの処理を開始します。
すべてのアクションが行われるメインクラスを実装しましょう。
public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List<List<Integer>> partialResults = Collections.synchronizedList(new ArrayList<>()); private Random random = new Random(); private int NUM__PARTIAL__RESULTS; private int NUM__WORKERS; //... }
このクラスはかなり単純です –
NUM
WORKERS
は実行しようとしているスレッドの数、
NUM
PARTIAL
RESULTS__は各ワーカースレッドが生成しようとしている結果の数です。最後に、
partialResults
があります。これは、これらの各ワーカースレッドの結果を格納するリストです。複数のスレッドが同時にリストに書き込むため、このリストは
SynchronizedList
であり、
add()
メソッドはプレーンな
ArrayList
ではスレッドセーフではありません。それでは、各ワーカースレッドのロジックを実装しましょう。
public class CyclicBarrierDemo { //... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List<Integer> partialResult = new ArrayList<>(); //Crunch some numbers and store the partial result for (int i = 0; i < NUM__PARTIAL__RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { //... } catch (BrokenBarrierException e) { //... } } } }
バリアがトリップされたときに実行されるロジックを実装します。
簡単にするために、部分結果リストにすべての番号を追加しましょう。
public class CyclicBarrierDemo { //... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM__WORKERS + " workers, having " + NUM__PARTIAL__RESULTS + " results each."); int sum = 0; for (List<Integer> threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }
最後のステップは
CyclicBarrier
を構築し、
main()
メソッドで開始することです。public class CyclicBarrierDemo { //Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM__PARTIAL__RESULTS = numberOfPartialResults; NUM__WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM__WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM__WORKERS + " worker threads to compute " + NUM__PARTIAL__RESULTS + " partial results each"); for (int i = 0; i < NUM__WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[]args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } }
上記のコードでは、それぞれが計算の一部として3つの整数を生成し、結果のリストに同じ値を格納する5つのスレッドで巡回バリアを初期化しました。
バリアがトリップされると、バリアをトリップした最後のスレッドがAggregatorThreadで指定されたロジックを実行します。つまり、スレッドによって生成されたすべての番号を加算します。
===
6. 結果
これは上記のプログラムの1回の実行による出力です。スレッドが異なる順序で生成される可能性があるため、実行ごとに異なる結果が生じる可能性があります。
Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46
上記の出力が示すように、
Thread 4
はバリアをトリップし、最終的な集計ロジックも実行するものです。上記の例が示すように、スレッドが実際に開始された順序で実行される必要もありません。===
7. 結論
この記事では、
CyclicBarrier
とは何か、またそれがどのような状況で役立つかについて説明しました。他のプログラムロジックを続行する前に、固定実行ポイントに到達するために固定数のスレッドが必要なシナリオも実装しました。
いつものように、このチュートリアルのコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[GitHubで動く]にあります。