1. 序章

この記事では、 CountDownLatch クラスのガイドを提供し、いくつかの実用的な例でどのように使用できるかを示します。

基本的に、 CountDownLatch を使用することで、他のスレッドが特定のタスクを完了するまでスレッドをブロックさせることができます。

2. 並行プログラミングでの使用法

簡単に言うと、CountDownLatchにはcounterフィールドがあり、必要に応じてデクリメントできます。 次に、それを使用して、ゼロまでカウントダウンされるまで呼び出し元のスレッドをブロックできます。

並列処理を行っている場合は、処理するスレッドの数と同じカウンターの値を使用して、CountDownLatchをインスタンス化できます。 次に、各スレッドが終了した後に countdown()を呼び出すだけで、 await()を呼び出す依存スレッドがワーカースレッドが終了するまでブロックされることが保証されます。

3. スレッドのプールが完了するのを待っています

Worker を作成し、 CountDownLatch フィールドを使用して、完了したことを通知することにより、このパターンを試してみましょう。

public class Worker implements Runnable {
    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

次に、CountDownLatchを取得してWorkerインスタンスが完了するのを待機できることを証明するために、テストを作成しましょう。

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

CountDownLatch のリリースに依存するため、当然、「ラッチリリース」が常に最後の出力になります。

await()を呼び出さなかった場合、スレッドの実行の順序を保証できないため、テストはランダムに失敗することに注意してください。

4. 開始を待機しているスレッドのプール

前の例を取り上げたが、今回は5つではなく数千のスレッドを開始した場合、後のスレッドで start()を呼び出す前に、前のスレッドの多くが処理を終了している可能性があります。 これにより、すべてのスレッドを並行して実行することができないため、同時実行の問題を再現することが困難になる可能性があります。

これを回避するために、CountdownLatchが前の例とは異なる動作をするようにしましょう。 一部の子スレッドが終了するまで親スレッドをブロックする代わりに、他のすべてのスレッドが開始するまで各子スレッドをブロックできます。

run()メソッドを変更して、処理前にブロックするようにします。

public class WaitingWorker implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List<String> outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

次に、すべての Workers が開始するまでブロックし、 Workers、のブロックを解除してから、Workersが終了するまでブロックするようにテストを変更しましょう。

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

このパターンは、並行性のバグを再現しようとする場合に非常に役立ちます。これは、何千ものスレッドにいくつかのロジックを並行して実行させるために使用できるためです。

5. CountdownLatchを早期に終了する

時々、私たちは状況に遭遇するかもしれません労働者カウントダウンする前にエラーで終了する CountDownLatch。 これにより、ゼロに到達せず、 待つ() 決して終了しない:

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

await()が永久にブロックする方法を示すために、 BrokenWorker、を使用するように以前のテストを変更してみましょう。

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    countDownLatch.await();
}

明らかに、これは私たちが望む動作ではありません。アプリケーションが無限にブロックするよりも継続する方がはるかに優れています。

これを回避するために、 await()。の呼び出しにタイムアウト引数を追加しましょう。

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

ご覧のとおり、テストは最終的にタイムアウトになり、 await()falseを返します。

6. 結論

このクイックガイドでは、 CountDownLatch を使用して、他のスレッドが処理を完了するまでスレッドをブロックする方法を示しました。

また、スレッドが並列で実行されるようにすることで、同時実行の問題をデバッグするためにどのように使用できるかについても説明しました。

これらの例の実装は、GitHubにあります。 これはMavenベースのプロジェクトであるため、そのまま実行するのは簡単です。