1概要

この記事では、

java.util.concurrent

パッケージに含まれる


Phaser


構文について調べます。これは


CountDownLatch


と非常によく似た構成で、スレッドの実行を調整することができます。

CountDownLatch

と比較して、いくつかの追加機能があります。


Phaser

は、動的なスレッド数が実行を継続する前に待機する必要があるバリアです。

CountDownLatch

では、この番号を動的に設定することはできず、インスタンスを作成するときに指定する必要があります。


2

Phaser

API


Phaser

を使用すると、スレッドは次の実行ステップに進む前にバリアを待機する必要があるロジックを構築できます** 。

各プログラムフェーズに

Phaser

インスタンスを再利用して、実行の複数フェーズを調整できます。各フェーズは、別のフェーズに進むのを待つスレッドの数が異なる場合があります。後でフェーズを使用する例を見ていきます。

調整に参加するには、スレッドは

Phaser

インスタンスを使用して

register()

自身を実行する必要があります。これは登録されたパーティの数を増やすだけで、現在のスレッドが登録されているかどうかを確認することはできません。これをサポートするには実装をサブクラス化する必要があります。

スレッドは、ブロッキングメソッドである

arriveAndAwaitAdvance()

を呼び出すことによって、バリアに到達したことを通知します。

到着したパーティーの数が登録したパーティーの数と等しい場合、プログラムの実行は継続され

、フェーズ数は増加します。

getPhase()

メソッドを呼び出すことで現在のフェーズ番号を取得できます。

スレッドがそのジョブを終了したら、

arriveAndDeregister()

メソッドを呼び出して、現在のスレッドがこの特定のフェーズで考慮されなくなったことを通知する必要があります。


3

Phaser

APIを使用したロジックの実装

アクションの複数のフェーズを調整したいとしましょう。 3つのスレッドが最初のフェーズを処理し、2つのスレッドが2番目のフェーズを処理します。


Runnable

インターフェースを実装する

LongRunningAction

クラスを作成します。

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ph.arriveAndDeregister();
    }
}

アクションクラスがインスタンス化されたら、

register()メソッドを使用して

Phaser

インスタンスに登録します。これにより、その特定の

Phaser.__を使用しているスレッドの数が増えます。


arriveAndAwaitAdvance()

を呼び出すと、現在のスレッドはバリアを待機します。すでに述べたように、到着したパーティーの数が登録したパーティーの数と同じになると、実行は続行されます。

処理が完了した後、現在のスレッドは

arriveAndDeregister()

メソッドを呼び出して自分自身の登録を解除しています。

3つの

LongRunningAction

スレッドを開始してバリアをブロックするテストケースを作成しましょう。次に、アクションが終了したら、次のフェーズの処理を実行する2つの追加の

LongRunningAction

スレッドを作成します。

メインスレッドから

Phaser

インスタンスを作成するときは、

1

を引数として渡します。これは、現在のスレッドから

register()

メソッドを呼び出すのと同じです。これは、3つのワーカースレッドを作成する場合、メインスレッドがコーディネーターになるため、

Phaser

に4つのスレッドを登録する必要があるためです。

ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);

assertEquals(0, ph.getPhase());

初期化後の位相はゼロです。


Phaser

クラスには、親インスタンスを渡すことができるコンストラクタがあります。大量の同期競合コストが発生する可能性があるパーティが多数ある場合に役立ちます。

そのような状況では、サブフェイザーのグループが共通の親を共有するように

Phasers

のインスタンスを設定することができます。

次に、3つの

LongRunningAction

アクションスレッドを開始しましょう。メインスレッドから

arriveAndAwaitAdvance()

メソッドを呼び出すまで、バリアを待機します。


Phaser



1

で初期化し、さらに3回

register()

を呼び出したことに注意してください。これで、3つのアクションスレッドがバリアに到達したことを発表したため、

arriveAndAwaitAdvance()

をもう一度呼び出す必要があります。メインスレッドからの呼び出しです。

executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();

assertEquals(1, ph.getPhase());

そのフェーズの完了後、

getPhase()

メソッドは1を返します。これは、プログラムが実行の最初のステップの処理を終了したためです。

2つのスレッドが次の処理段階を実行する必要があるとしましょう。

それを実現するために

Phaser

を利用することができます。バリアを待つスレッド数を動的に設定できるからです。 2つの新しいスレッドを開始していますが、メインスレッドから

arriveAndAwaitAdvance()

が呼び出されるまで実行されません(前の場合と同じ)。

executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();

assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

この後、

getPhase()

メソッドは2に等しいフェーズ番号を返します。プログラムを終了するときは、メインスレッドがまだ

Phaserに登録されているので

arriveAndDeregister()__メソッドを呼び出す必要があります。同期メソッドへの呼び出しはもうブロックされず、すぐに戻ります。

プログラムを実行すると、次のような出力が生成されます(print lineステートメントを含む完全なソースコードはコードリポジトリにあります)。

This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

バリアが開くまで、すべてのスレッドが実行を待機していることがわかります。実行の次のフェーズは、前のフェーズが正常に終了したときにのみ実行されます。


4結論

このチュートリアルでは、

java.util.concurrent



Phaser

構文を見て、

Phaser

クラスを使用して複数のフェーズで調整ロジックを実装しました。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[GitHubプロジェクト]にあります – これはMavenプロジェクトです。そのままインポートして実行するのは簡単です。