1. 概要

この記事では、java.util.concurrentパッケージからのPhaserコンストラクトについて説明します。 これは、 CountDownLatch と非常によく似た構造であり、スレッドの実行を調整できます。 CountDownLatch と比較すると、いくつかの追加機能があります。

Phaser は、動的な数のスレッドが実行を続行する前に待機する必要があるバリアです。 CountDownLatch では、その番号を動的に構成することはできず、インスタンスの作成時に指定する必要があります。

2. Phaser API

Phaser を使用すると、スレッドが次の実行ステップに進む前にバリアで待機する必要があるロジックを構築できます。

プログラムフェーズごとにPhaserインスタンスを再利用して、実行の複数のフェーズを調整できます。 各フェーズには、別のフェーズへの進行を待機する異なる数のスレッドを含めることができます。 フェーズの使用例については、後で説明します。

調整に参加するには、スレッドは register()自身をPhaserインスタンスに登録する必要があります。 これは登録されたパーティの数を増やすだけであり、現在のスレッドが登録されているかどうかを確認できないことに注意してください。これをサポートするには、実装をサブクラス化する必要があります。

スレッドは、ブロッキングメソッドである arriveAndAwaitAdvance()を呼び出すことにより、バリアに到達したことを通知します。 到着したパーティの数が登録されたパーティの数と同じになると、プログラムの実行が続行され、フェーズ数が増加します。 getPhase()メソッドを呼び出すことで、現在のフェーズ番号を取得できます。

スレッドがジョブを終了したら、 arriveAndDeregister()メソッドを呼び出して、現在のスレッドがこの特定のフェーズで考慮されないようにする必要があることを通知する必要があります。

3. PhaserAPIを使用したロジックの実装

アクションの複数のフェーズを調整したいとします。 3つのスレッドが最初のフェーズを処理し、2つのスレッドが2番目のフェーズを処理します。

Runnableインターフェイスを実装するLongRunningActionクラスを作成します。

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ph.arriveAndDeregister();
    }
}

アクションクラスがインスタンス化されると、 register()メソッドを使用してPhaserインスタンスに登録されます。 これにより、その特定のスレッドを使用するスレッドの数が増加します Phaser。

arriveAndAwaitAdvance()を呼び出すと、現在のスレッドがバリアで待機します。 すでに述べたように、到着したパーティの数が登録されたパーティの数と同じになった場合、実行は続行されます。

処理が完了した後、現在のスレッドは arriveAndDeregister()メソッドを呼び出して自身の登録を解除しています。

3つのLongRunningActionスレッドを開始し、バリアでブロックするテストケースを作成しましょう。 次に、アクションが終了した後、次のフェーズの処理を実行する2つの追加のLongRunningActionスレッドを作成します。

メインスレッドからPhaserインスタンスを作成する場合、引数として1を渡します。 これは、現在のスレッドから register()メソッドを呼び出すのと同じです。 これを行うのは、3つのワーカースレッドを作成する場合、メインスレッドがコーディネーターであるため、Phaserに4つのスレッドを登録する必要があるためです。

ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
 
assertEquals(0, ph.getPhase());

初期化後のフェーズはゼロに等しくなります。

Phaser クラスには、親インスタンスを渡すことができるコンストラクターがあります。 これは、同期の競合コストが非常に高くなるパーティが多数ある場合に役立ちます。 このような状況では、 Phasers のインスタンスを設定して、サブフェイザーのグループが共通の親を共有するようにすることができます。

次に、3つの LongRunningAction アクションスレッドを開始しましょう。メインスレッドからarriveAndAwaitAdvance()メソッドを呼び出すまで、バリアで待機します。

Phaser1で初期化し、 register()をさらに3回呼び出したことを覚えておいてください。 現在、3つのアクションスレッドがバリアに到達したことを発表しているため、 arriveAndAwaitAdvance()の呼び出しがもう1つ必要です。メインスレッドからの呼び出しです。

executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();
 
assertEquals(1, ph.getPhase());

そのフェーズの完了後、プログラムが実行の最初のステップの処理を終了したため、 getPhase()メソッドは1つを返します。

2つのスレッドが処理の次のフェーズを実行する必要があるとしましょう。 Phaser を活用して、バリアで待機するスレッドの数を動的に構成できるため、これを実現できます。 2つの新しいスレッドを開始していますが、メインスレッドから arriveAndAwaitAdvance()が呼び出されるまで、これらのスレッドは実行されません(前の場合と同じ)。

executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();
 
assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

この後、 getPhase()メソッドは2に等しいフェーズ番号を返します。 プログラムを終了したいときは、 ArrivateAndDeregister() メインスレッドとしてのメソッドはまだに登録されています Phaser。 登録抹消により登録当事者数がゼロになると、 Phaser終了しました。 同期メソッドへのすべての呼び出しはブロックされなくなり、すぐに戻ります。

プログラムを実行すると、次の出力が生成されます(print lineステートメントを含む完全なソースコードはコードリポジトリにあります)。

This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

バリアが開くまで、すべてのスレッドが実行を待機していることがわかります。 実行の次のフェーズは、前のフェーズが正常に終了した場合にのみ実行されます。

4. 結論

このチュートリアルでは、java.util.concurrentからのPhaserコンストラクトを確認し、Phaserクラスを使用して複数のフェーズで調整ロジックを実装しました。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。