1. 概要

このチュートリアルでは、Javaの最も基本的なメカニズムの1つであるスレッド同期について説明します。

最初に、いくつかの重要な並行性関連の用語と方法論について説明します。

そして、 wait() notify()をよりよく理解することを目的として、並行性の問題に対処する簡単なアプリケーションを開発します。

2. Javaでのスレッド同期

マルチスレッド環境では、複数のスレッドが同じリソースを変更しようとする場合があります。 もちろん、スレッドを適切に管理しないと、一貫性の問題が発生します。

2.1. Javaの保護されたブロック

Javaで複数のスレッドのアクションを調整するために使用できるツールの1つは、保護されたブロックです。 このようなブロックは、実行を再開する前に特定の条件をチェックし続けます。

それを念頭に置いて、以下を利用します。

  • Object.wait()はスレッドを一時停止します
  • Object.notify()スレッドをウェイクアップします

これは、スレッドのライフサイクルを示す次の図からよりよく理解できます。

このライフサイクルを制御する方法はたくさんあることに注意してください。 ただし、この記事では、 wait() notify()のみに焦点を当てます。

3. wait()メソッド

簡単に言うと、 wait()を呼び出すと、現在のスレッドは、他のスレッドが同じオブジェクトで notify()または notifyAll()を呼び出すまで待機します。

このため、現在のスレッドはオブジェクトのモニターを所有している必要があります。 Javadocs によると、これは次の方法で発生する可能性があります。

  • 指定されたオブジェクトに対してsynchronizedインスタンスメソッドを実行したとき
  • 指定されたオブジェクトで同期ブロックの本体を実行したとき
  • タイプクラスのオブジェクトに対して同期静的メソッドを実行する

一度に1つのアクティブなスレッドのみがオブジェクトのモニターを所有できることに注意してください。

このwait()メソッドには、3つのオーバーロードされたシグネチャが付属しています。 これらを見てみましょう。

3.1. wait()

wait()メソッドは、別のスレッドがこのオブジェクトに対して notify()を呼び出すか、 notifyAll()を呼び出すまで、現在のスレッドを無期限に待機させます。

3.2. 待機(長いタイムアウト)

このメソッドを使用すると、スレッドが自動的にウェイクアップされるまでのタイムアウトを指定できます。 notify()または notifyAll()を使用して、タイムアウトに達する前にスレッドをウェイクアップできます。

wait(0)を呼び出すことは、 wait()を呼び出すことと同じであることに注意してください。

3.3. wait(long timeout、int nanos)

これは、同じ機能を提供するさらに別の署名です。 ここでの唯一の違いは、より高い精度を提供できることです。

合計タイムアウト期間(ナノ秒単位)は、 1_000_000 * timeout +nanosとして計算されます。

4. notify()および notifyAll()

notify()メソッドを使用して、このオブジェクトのモニターへのアクセスを待機しているスレッドをウェイクアップします。

待機中のスレッドに通知する方法は2つあります。

4.1. notify()

このオブジェクトのモニターで待機しているすべてのスレッド( wait()メソッドのいずれかを使用)について、メソッド notify()は、任意にウェイクアップするようにいずれかのスレッドに通知します。 ウェイクするスレッドの正確な選択は非決定的であり、実装によって異なります。

notify()は単一のランダムスレッドをウェイクアップするため、これを使用して、スレッドが同様のタスクを実行している場合に相互に排他的なロックを実装できます。 ただし、ほとんどの場合、 notifyAll()を実装する方が実行可能です。

4.2. notifyAll()

このメソッドは、このオブジェクトのモニターで待機しているすべてのスレッドを単にウェイクアップします。

目覚めたスレッドは、他のスレッドと同様に、通常の方法で完了します。

ただし、実行を続行する前に、常にスレッドを続行するために必要な条件のクイックチェックを定義します。これは、通知を受信せずにスレッドがウェイクアップする場合があるためです(これはシナリオについては、後の例で説明します)。

5. 送受信者の同期の問題

基本を理解したところで、 wait() notify()を利用する簡単な Sender Receiverアプリケーションを見ていきましょう。 それらの間の同期を設定する方法:

  • 送信者は、データパケットを受信者に送信することになっています。
  • Receiver は、Senderがデータパケットの送信を終了するまでデータパケットを処理できません。
  • 同様に、 Sender は、 Receiver が前のパケットをすでに処理していない限り、別のパケットを送信しようとしないでください。

まず、SenderからReceiverに送信されるデータpacketで構成されるDataクラスを作成しましょう。 wait() notifyAll()を使用して、これらの間の同期を設定します。

public class Data {
    private String packet;
    
    // True if receiver should wait
    // False if sender should wait
    private boolean transfer = true;
 
    public synchronized String receive() {
        while (transfer) {
            try {
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
                System.out.println("Thread Interrupted");
            }
        }
        transfer = true;
        
        String returnPacket = packet;
        notifyAll();
        return returnPacket;
    }
 
    public synchronized void send(String packet) {
        while (!transfer) {
            try { 
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
                System.out.println("Thread Interrupted");
            }
        }
        transfer = false;
        
        this.packet = packet;
        notifyAll();
    }
}

ここで何が起こっているのかを分析してみましょう。

  • packet 変数は、ネットワークを介して転送されるデータを示します。
  • boolean変数transferがあり、SenderReceiverが同期に使用します。
    • この変数がtrueの場合、ReceiverSenderがメッセージを送信するのを待つ必要があります。
    • false の場合、SenderReceiverがメッセージを受信するのを待つ必要があります。
  • Sender は、 send()メソッドを使用して、Receiverにデータを送信します。
    • transferfalseの場合、このスレッドで wait()を呼び出して待機します。
    • ただし、 true の場合は、ステータスを切り替えてメッセージを設定し、 notifyAll()を呼び出して他のスレッドをウェイクアップし、重要なイベントが発生したことを指定します。実行を継続できます。
  • 同様に、 Receiverreceive()メソッドを使用します。
    • transferSenderによってfalseに設定されている場合にのみ続行されます。それ以外の場合は、 wait()を呼び出します。スレッド。
    • 条件が満たされると、ステータスを切り替え、待機中のすべてのスレッドにウェイクアップを通知し、受信したデータパケットを返します。

5.1. wait() while ループで囲むのはなぜですか?

notify()および notifyAll()は、このオブジェクトのモニターで待機しているスレッドをランダムにウェイクアップするため、条件が満たされていることが常に重要であるとは限りません。 スレッドがウェイクアップされることもありますが、実際にはまだ条件が満たされていません。

また、スプリアスウェイクアップから私たちを救うためのチェックを定義することもできます。この場合、スレッドは通知を受信せずに待機からウェイクアップできます。

5.2. s end()メソッドと receive()メソッドを同期する必要があるのはなぜですか?

これらのメソッドを同期されたメソッド内に配置して、組み込みロックを提供しました。 wait()メソッドを呼び出すスレッドが固有のロックを所有していない場合、エラーがスローされます。

次に、SenderReceiverを作成し、両方に Runnable インターフェースを実装して、それらのインスタンスをスレッドで実行できるようにします。

まず、Senderがどのように機能するかを確認します。

public class Sender implements Runnable {
    private Data data;
 
    // standard constructors
 
    public void run() {
        String packets[] = {
          "First packet",
          "Second packet",
          "Third packet",
          "Fourth packet",
          "End"
        };
 
        for (String packet : packets) {
            data.send(packet);

            // Thread.sleep() to mimic heavy server-side processing
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
    }
}

この送信者を詳しく見てみましょう。

  • packets []arrayでネットワークを介して送信されるランダムなデータパケットを作成しています。
  • パケットごとに、単にsend()を呼び出しています。
  • 次に、 Thread.sleep()をランダムな間隔で呼び出して、サーバー側の重い処理を模倣します。

最後に、Receiverを実装しましょう。

public class Receiver implements Runnable {
    private Data load;
 
    // standard constructors
 
    public void run() {
        for(String receivedMessage = load.receive();
          !"End".equals(receivedMessage);
          receivedMessage = load.receive()) {
            
            System.out.println(receivedMessage);

            // ...
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
    }
}

ここでは、最後の「End」データパケットを取得するまで、ループ内で load.receive()を呼び出しています。

このアプリケーションの動作を見てみましょう。

public static void main(String[] args) {
    Data data = new Data();
    Thread sender = new Thread(new Sender(data));
    Thread receiver = new Thread(new Receiver(data));
    
    sender.start();
    receiver.start();
}

次の出力が表示されます。

First packet
Second packet
Third packet
Fourth packet

そして、ここにいます。 すべてのデータパケットを正しい順序で受信し、送信者と受信者の間の正しい通信を正常に確立しました。

6. 結論

この記事では、Javaのコア同期の概念について説明しました。 具体的には、 wait()および notify()を使用して、興味深い同期の問題を解決する方法に焦点を当てました。 最後に、これらの概念を実際に適用したコードサンプルを確認しました。

最後に、 wait() notify() notifyAll()などのこれらすべての低レベルAPIは従来のメソッドであることに注意してください。これはうまく機能しますが、JavaのネイティブLockおよびConditionインターフェイス( java .utilで利用可能)など、より高レベルのメカニズムの方が単純で優れていることがよくあります。 .concurrent.locks パッケージ)。

java.util.concurrent パッケージの詳細については、java.util.concurrentの記事の概要を参照してください。 また、LockおよびConditionは、java.util.concurrent.Locksガイドで説明されています。

いつものように、この記事で使用されている完全なコードスニペットは、GitHubで入手できます。