1前書き

この記事では、


CountDownLatch


クラスについて説明し、その使用方法を説明します。いくつかの実用的な例で使用されています。

基本的に、

CountDownLatch

を使うことで、他のスレッドが与えられたタスクを完了するまでスレッドをブロックさせることができます。


2並行プログラミングでの使用法

簡単に言うと、

CountDownLatch

には

counter

フィールドがあり、これは必要に応じて減らすことができます。その後、ゼロまでカウントダウンされるまで呼び出しスレッドをブロックするためにそれを使用できます。

並列処理をしているのであれば、

CountDownLatch

をカウンターの値として、作業したいスレッドの数と同じ値でインスタンス化することができます。次に、各スレッドが終了した後に

countdown()

を呼び出すだけで、

await()

を呼び出す従属スレッドはワーカースレッドが終了するまでブロックされます。


3スレッドのプールが完了するのを待っている


Worker

を作成し、

CountDownLatch

フィールドを使用して完了したことを知らせることで、このパターンを試してみましょう。

public class Worker implements Runnable {
    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

それから、

Worker

インスタンスが完了するのを待つために

CountDownLatch

を取得できることを証明するためのテストを作成しましょう。

@Test
public void whenParallelProcessing__thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await();
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }


CountDownLatch

リリースに依存するため、当然のことながら、「リリースされたラッチ」は常に最後の出力になります。


await()

を呼び出さないと、スレッドの実行順序を保証できないため、テストはランダムに失敗します。

4.

開始を待っているスレッドのプール

前の例を使用したが、今回は5つではなく何千ものスレッドを開始した場合、後のスレッドで

start()

を呼び出す前に、前のスレッドの多くが処理を終了する可能性があります。すべてのスレッドを並行して実行させることができないため、これにより同時実行性の問題を試して再現することが困難になる可能性があります。

これを回避するために、

CountdownLatch

を前の例とは異なる動作にします。いくつかの子スレッドが終了するまで親スレッドをブロックする代わりに、他のすべてのスレッドが開始されるまで各子スレッドをブロックできます。

処理前にブロックされるように

run()

メソッドを変更しましょう。

public class WaitingWorker implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List<String> outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

それでは、すべての

Workers

が開始されるまでブロックし、

Workers

のブロックを解除してから

Workers

が終了するまでブロックするようにテストを変更しましょう。

@Test
public void whenDoingLotsOfThreadsInParallel__thenStartThemAtTheSameTime()
 throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await();
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown();
    completedThreadCounter.await();
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

このパターンは、何千ものスレッドにいくつかのロジックを並列に実行させようとするために使用される可能性があるため、並行性のバグを再現するのに非常に役立ちます。


5

CountdownLatch

早期終了

時々、

CountDownLatchをカウントダウンする前に

Workers__が誤って終了するという状況に遭遇するかもしれません。

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}


await()

が永遠にブロックされる方法を示すために、

BrokenWorker

を使用するように以前のテストを変更しましょう。

@Test
public void whenFailingToParallelProcess__thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    countDownLatch.await();
}

明らかに、これは私たちが望む振る舞いではありません – 無限にブロックするよりもアプリケーションを続ける方がはるかに良いでしょう。

これを回避するために、__await()への呼び出しにタイムアウト引数を追加しましょう。

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

ご覧のとおり、テストは最終的にタイムアウトし、

await()



false

を返します。


6. 結論

このクイックガイドでは、他のスレッドが何らかの処理を終了するまでスレッドをブロックするために

CountDownLatch

を使用する方法を説明しました。

スレッドが確実に並列実行されるようにすることで、並行性の問題のデバッグにどのように使用できるかについても説明しました。

これらの例の実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]で見つけることができます。これはMavenベースのプロジェクトなので、そのまま実行するのは簡単なはずです。