1. 概要

マルチスレッドプログラミングにより、スレッドを同時に実行でき、各スレッドは異なるタスクを処理できます。 したがって、特にコンピュータに複数のマルチコアCPUまたは複数のCPUがある場合は、リソースを最適に利用します。

複数のスレッドを同時に開始するように制御したい場合があります。

このチュートリアルでは、最初に要件、特に「まったく同じ時間」の意味を理解します。 さらに、Javaで2つのスレッドを同時に開始する方法についても説明します。

2. 要件を理解する

要件は、「2つのスレッドをまったく同時に開始する」ことです。

この要件は理解しやすいように見えます。 しかし、よく考えてみると、 EXACT で2つのスレッドを同時に起動することも可能ですか?

まず第一に、各スレッドは動作するためにCPU時間を消費します。 したがって、アプリケーションがシングルコアCPUを搭載したコンピューターで実行されている場合、2つのスレッドを同時に開始することは不可能です。

コンピューターにマルチコアCPUまたは複数のCPUがある場合、2つのスレッドが正確なで同時に開始される可能性があります。 ただし、Java側では制御できません。

これは、Javaでスレッドを操作する場合、Javaスレッドスケジューリングがオペレーティングシステムのスレッドスケジューリングに依存するためです。 したがって、オペレーティングシステムが異なれば、処理も異なる場合があります。

さらに、アインシュタインの特殊相対性理論によると、より厳密な方法で「まったく同じ時間」について議論すると、次のようになります。

それらのイベントが空間で分離されている場合、2つの異なるイベントが同時に発生すると絶対的な意味で言うことは不可能です。

CPUがマザーボードまたはCPUに配置されたコアにどれだけ接近していても、スペースがあります。 したがって、2つのスレッドがEXACTで同時に開始することを保証することはできません。

それで、それは要件が無効であることを意味しますか?

いいえ。 これは有効な要件です。 EXACT で2つのスレッドを同時に開始できない場合でも、いくつかの同期手法を使用してかなり近づけることができます。

これらの手法は、「同時に」開始するために2つのスレッドが必要なほとんどの実際的なケースで役立ちます。

このチュートリアルでは、この問題を解決するための2つのアプローチについて説明します。

  • CountDownLatchクラスの使用
  • CyclicBarrierクラスの使用
  • Phaserクラスの使用

すべてのアプローチは同じ考え方に従います。実際には、2つのスレッドを同時に開始することはありません。 代わりに、スレッドが開始した直後にスレッドをブロックし、同時に実行を再開しようとします。

テストはスレッドのスケジューリングに関連しているため、このチュートリアルでテストを実行するための環境について言及する価値があります。

  • CPU:Intel(R)Core(TM)i7-8850HCPU。 プロセッサクロックは2.6〜4.3 GHzです(4.1は4コア、4 GHzは6コア)
  • オペレーティングシステム:カーネルバージョン5.12.12を搭載した64ビットLinux
  • Java:Java 11

それでは、CountDonwLatchCyclicBarrierの動作を見てみましょう。

3. CountDownLatchクラスの使用

CountDownLatch は、java.util.concurrentパッケージの一部としてJava5で導入されたシンクロナイザーです。 通常、 CountDownLatchを使用して、他のスレッドがタスクを完了するまでスレッドをブロックします。

簡単に言うと、ラッチオブジェクトに count を設定し、ラッチオブジェクトをいくつかのスレッドに関連付けます。 これらのスレッドを開始すると、ラッチのカウントがゼロになるまでブロックされます。

一方、他のスレッドでは、 count を減らして、たとえばメインスレッドの一部のタスクが実行されたときに、ブロックされたスレッドを再開させる条件を制御できます。

3.1. ワーカースレッド

それでは、CountDownLatchクラスを使用して問題を解決する方法を見てみましょう。

まず、Threadクラスを作成します。 それをWorkerWithCountDownLatchと呼びましょう:

public class WorkerWithCountDownLatch extends Thread {
    private CountDownLatch latch;

    public WorkerWithCountDownLatch(String name, CountDownLatch latch) {
        this.latch = latch;
        setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the latch...\n", getName());
            latch.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (InterruptedException e) {
            // handle exception
        }
    }

ラッチオブジェクトをWorkerWithCountDownLatchクラスに追加しました。 まず、ラッチオブジェクトの機能を理解しましょう。

の中に走る() メソッド、メソッドを呼び出しますラッチ.await()。 つまり、私たちが始めた場合ワーカースレッド、それはチェックしますラッチの数。 スレッドは、 カウントゼロです。

このようにして、メインスレッドで count = 1を使用してCountDownLatch(1)ラッチを作成し、ラッチオブジェクトを必要な2つのワーカースレッドに関連付けることができます。同時に開始します。

2つのスレッドが実際のジョブの実行を再開するようにするには、メインスレッドでラッチ.countDown()を呼び出してラッチを解放します。

次に、メインスレッドが2つのワーカースレッドをどのように制御するかを見てみましょう。

3.2. メインスレッド

usingCountDownLatch()メソッドにメインスレッドを実装します。

private static void usingCountDownLatch() throws InterruptedException {
    System.out.println("===============================================");
    System.out.println("        >>> Using CountDownLatch <<<<");
    System.out.println("===============================================");

    CountDownLatch latch = new CountDownLatch(1);

    WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch);
    WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now release the latch:");
    System.out.println("-----------------------------------------------");
    latch.countDown();
}

次に、上記の usingCountDownLatch()メソッドを main()メソッドから呼び出します。 main()メソッドを実行すると、次の出力が表示されます。

===============================================
        >>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
 Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z

上記の出力が示すように、 2つのワーカースレッドがほぼ同時に開始されました。 2つの開始時間の差は2マイクロ秒未満です

4. CyclicBarrierクラスの使用

CyclicBarrier クラスは、Java5で導入されたもう1つのシンクロナイザーです。 基本的に、 CyclicBarrierを使用すると、固定数のスレッドが、実行を続行する前に、互いに共通のポイントに到達するのを待つことができます

次に、CyclicBarrierクラスを使用して問題を解決する方法を見てみましょう。

4.1. ワーカースレッド

まず、ワーカースレッドの実装を見てみましょう。

public class WorkerWithCyclicBarrier extends Thread {
    private CyclicBarrier barrier;

    public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) {
        this.barrier = barrier;
        this.setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the barrier\n", getName());
            barrier.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (InterruptedException | BrokenBarrierException e) {
            // handle exception
        }
    }
}

実装は非常に簡単です。 バリアオブジェクトをワーカースレッドに関連付けます。 スレッドが開始すると、すぐに surface.await()メソッドを呼び出します。

このようにして、ワーカースレッドはブロックされ、すべての関係者が surface.await()を呼び出して再開するのを待機します。

4.2. メインスレッド

次に、メインスレッドで再開する2つのワーカースレッドを制御する方法を見てみましょう。

private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException {
    System.out.println("\n===============================================");
    System.out.println("        >>> Using CyclicBarrier <<<<");
    System.out.println("===============================================");

    CyclicBarrier barrier = new CyclicBarrier(3);

    WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier);
    WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now open the barrier:");
    System.out.println("-----------------------------------------------");
    barrier.await();
}

私たちの目標は、2つのワーカースレッドを同時に再開できるようにすることです。 つまり、メインスレッドと合わせて、合計3つのスレッドがあります。

上記の方法が示すように、メインスレッドに3つのパーティを持つバリアオブジェクトを作成します。 次に、2つのワーカースレッドを作成して開始します。

前に説明したように、2つのワーカースレッドはブロックされ、バリアのオープンが再開するのを待っています。

メインスレッドでは、実際の作業を行うことができます。 バリアを開くことにした場合、メソッド surface.await()を呼び出して、2人のワーカーが実行を継続できるようにします。

main()メソッドで usingCyclicBarrier()を呼び出すと、次の出力が得られます。

===============================================
        >>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
 Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z

ワーカーの2つの開始時間を比較できます。 2人の作業者がまったく同時に開始しなかったとしても、私たちは目標にかなり近づいています。2つの開始時間の差は3マイクロ秒未満です。

5. Phaserクラスの使用

Phaser クラスは、Java7で導入されたシンクロナイザーです。 CyclicBarrierおよびCountDownLatchに似ています。 ただし、Phaserクラスの方が柔軟性があります。

たとえば、CyclicBarrierCountDownLatchとは異なり、Phaserではスレッドパーティを動的に登録できます。

次に、Phaserを使用して問題を解決しましょう。

5.1. ワーカースレッド

いつものように、最初に実装を見てから、それがどのように機能するかを理解します。

public class WorkerWithPhaser extends Thread {
    private Phaser phaser;

    public WorkerWithPhaser(String name, Phaser phaser) {
        this.phaser = phaser;
        phaser.register();
        setName(name);
    }

    @Override public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the phaser\n", getName());
            phaser.arriveAndAwaitAdvance();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (IllegalStateException e) {
            // handle exception
        }
    }
}

ワーカースレッドがインスタンス化されると、 phaser.register()を呼び出して、現在のスレッドを指定されたPhaserオブジェクトに登録します。 このようにして、現在の作業はphaserバリアの1つのスレッドパーティになります。

次に、ワーカースレッドが開始すると、すぐに phaser.arriveAndAwaitAdvance()を呼び出します。 したがって、 phaser に、現在のスレッドが到着したことを通知し、他のスレッドパーティの到着が続行されるのを待ちます。 もちろん、他のスレッドパーティが到着する前に、現在のスレッドはブロックされます。

5.2. メインスレッド

次に、次に進み、メインスレッドの実装を見てみましょう。

private static void usingPhaser() throws InterruptedException {
    System.out.println("\n===============================================");
    System.out.println("        >>> Using Phaser <<<");
    System.out.println("===============================================");

    Phaser phaser = new Phaser();
    phaser.register();

    WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser);
    WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser);

    worker1.start();
    worker2.start();

    Thread.sleep(10);//simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now open the phaser barrier:");
    System.out.println("-----------------------------------------------");
    phaser.arriveAndAwaitAdvance();
}

上記のコードでは、ご覧のとおり、メインスレッドはそれ自体をPhaserオブジェクトのスレッドパーティとして登録します。

2つのworkerスレッドを作成してブロックした後、メインスレッドは phaser.arriveAndAwaitAdvance()も呼び出します。 このようにして、フェイザーバリアを開き、2つのworkerスレッドを同時に再開できるようにします。

最後に、 main()メソッドで usingPhaser()メソッドを呼び出しましょう。

===============================================
        >>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser
[ Worker with phaser 2 ] created, blocked by the phaser
-----------------------------------------------
 Now open the phaser barrier:
-----------------------------------------------
[ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z
[ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z

同様に、 2つのワーカースレッドがほぼ同時に開始されました。 2つの開始時間の差は2マイクロ秒未満です

6. 結論

この記事では、最初に「2つのスレッドをまったく同時に開始する」という要件について説明しました。

次に、 CountDownLatch CyclicBarrier 、およびPhaserを使用して3つのスレッドを同時に開始する2つのアプローチについて説明しました。

彼らの考えは似ており、2つのスレッドをブロックし、同時に実行を再開させようとしています。

これらのアプローチでは、2つのスレッドが完全に同時に開始することを保証することはできませんが、結果はかなり近く、現実の世界のほとんどの場合に十分です。

いつものように、記事のコードはGitHubにあります。