1. 概要

このチュートリアルでは、Javaで生産者/消費者問題を実装する方法を学習します。 この問題は、制限付きバッファー問題とも呼ばれます。

この問題の詳細については、生産者/消費者問題のwikiページを参照してください。 Javaスレッド/同時実行の基本については、Java同時実行の記事を参照してください。

2. 生産者/消費者問題

プロデューサーとコンシューマーは2つの別個のプロセスです。 両方のプロセスは、共通のバッファーまたはキューを共有します。 プロデューサーは特定のデータを継続的に生成してバッファーにプッシュしますが、コンシューマーはそれらのデータをバッファーから消費します。

この単純なシナリオを示す図を確認してみましょう。

本質的に、この問題には対処すべき特定の複雑さがあります

  • プロデューサーとコンシューマーの両方が同時にキューを更新しようとする場合があります。 これにより、データの損失や不整合が発生する可能性があります。
  • 生産者は消費者より遅いかもしれません。 このような場合、コンシューマーは要素を高速に処理して待機します。
  • 場合によっては、コンシューマーはプロデューサーよりも遅くなる可能性があります。 この状況は、キューオーバーフローの問題につながります。
  • 実際のシナリオでは、複数のプロデューサー、複数のコンシューマー、またはその両方が存在する場合があります。 これにより、同じメッセージが異なるコンシューマーによって処理される可能性があります。

次の図は、複数のプロデューサーと複数のコンシューマーの場合を示しています。

いくつかの複雑さを解決するには、リソースの共有と同期を処理する必要があります。

  • データの追加と削除中のキューでの同期
  • キューが空の場合、コンシューマーはプロデューサーがキューに新しいデータを追加するまで待機する必要があります
  • キューがいっぱいになると、プロデューサーはコンシューマーがデータを消費し、キューに空のバッファーがあるまで待機する必要があります

3. スレッドを使用したJavaの例

問題のエンティティごとに個別のクラスを定義しました。

3.1. メッセージクラス

Message クラスは、生成されたデータを保持します。

public class Message {
    private int id;
    private double data;

    // constructors and getter/setters
}

データはどのタイプでもかまいません。 JSON文字列、複雑なオブジェクト、または単なる数値の場合があります。 また、データをMessageクラスにラップすることは必須ではありません。

3.2. DataQueueクラス

共有キューと関連オブジェクトは、DataQueueクラスにラップされます。

public class DataQueue {
    private final Queue<Message> queue = new LinkedList<>();
    private final int maxSize;
    private final Object FULL_QUEUE = new Object();
    private final Object EMPTY_QUEUE = new Object();

    DataQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    // other methods
}

制限付きバッファを作成するために、キューとそのmaxSizeが取得されます。

Javaでは、synchronizedブロックはオブジェクトを使用してスレッドの同期を実現します。 各オブジェクトには固有のロックがあります。最初にロックを取得したスレッドのみが同期ブロックの実行を許可されます。

ここでは、同期に使用する2つの参照FULL_QUEUEEMPTY_QUEUEを作成しました。 これらのハンドルには他の目的がないため、Objectクラスを使用して初期化しました。

キューがいっぱいになると、プロデューサーはFULL_QUEUEオブジェクトを待機します。 そして、消費者はメッセージを消費するとすぐに通知します。

プロデューサープロセスは、waitOnFullメソッドを呼び出します。

public void waitOnFull() throws InterruptedException {
    synchronized (FULL_QUEUE) {
        FULL_QUEUE.wait();
    }
}

そして、コンシューマープロセスは、notifyAllForFullメソッドを介してプロデューサーに通知します。

public void notifyAllForFull() {
    synchronized (FULL_QUEUE) {
        FULL_QUEUE.notifyAll();
    }
}

キューが空の場合、コンシューマーはEMPTY_QUEUEオブジェクトを待機します。 そして、メッセージがキューに追加されるとすぐに、プロデューサーはそれを通知します。

コンシューマプロセスは、waitOnEmptyメソッドを使用して待機します。

public void waitOnEmpty() throws InterruptedException {
    synchronized (EMPTY_QUEUE) {
        EMPTY_QUEUE.wait();
    }
}

プロデューサーは、notifyAllForEmptyメソッドを使用してコンシューマーに通知します。

public void notifyAllForEmpty() {
    synchronized (EMPTY_QUEUE) {
        EMPTY_QUEUE.notify();
    }
}

そして、プロデューサーは add()メソッドを使用して、キューにメッセージを追加します。

public void add(Message message) {
    synchronized (queue) {
        queue.add(message);
    }
}

コンシューマーはremoveメソッドを呼び出して、キューからメッセージを取得します。

public Message remove() {
    synchronized (queue) {
        return queue.poll();
    }
}

3.3. プロデューサークラス

Producer クラスは、 Runnable インターフェースを実装して、スレッドの作成を可能にします。

public class Producer implements Runnable {
    private final DataQueue dataQueue;
    private volatile boolean runFlag;

    public Producer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
        runFlag = true;
    }

    @Override
    public void run() {
        produce();
    }

    // Other methods
}

コンストラクターは、共有dataQueueパラメーターを使用します。 メンバー変数runFlagは、プロデューサープロセスを正常に停止するのに役立ちます。 trueに初期化されます。

スレッド開始はproduce()メソッドを呼び出します。

public void produce() {
    while (runFlag) {
        Message message = generateMessage();
        while (dataQueue.isFull()) {
            try {
                dataQueue.waitOnFull();
            } catch (InterruptedException e) {
                break;
            }
        }
        if (!runFlag) {
            break;
        }
        dataQueue.add(message);
        dataQueue.notifyAllForEmpty();
    }
}

プロデューサーは、whileループでステップを継続的に実行します。 runFlagfalseの場合、このループは中断されます。

各反復で、メッセージが生成されます。 次に、キューがいっぱいかどうかを確認し、必要に応じて待機します。 if ブロックの代わりに、whileループを使用してキューがいっぱいかどうかを確認します。 これは、待機状態からの誤ったウェイクアップを回避するためです。

プロデューサーは待機から復帰すると、続行する必要があるか、プロセスから抜け出す必要があるかを確認します。 キューにメッセージを追加し、空のキューで待機しているコンシューマーに通知します。

stop ()メソッドは、プロセスを正常に終了します。

public void stop() {
    runFlag = false;
    dataQueue.notifyAllForFull();
}

runFlagfalseに変更すると、「キューがいっぱい」状態で待機しているすべてのプロデューサーに通知されます。 これにより、すべてのプロデューサースレッドが確実に終了します。

3.4. 消費者クラス

Consumer クラスは、 Runnable を実装して、スレッドの作成を有効にします。

public class Consumer implements Runnable {
    private final DataQueue dataQueue;
    private volatile boolean runFlag;

    public Consumer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
        runFlag = true;
    }

    @Override
    public void run() {
        consume();
    }

    // Other methods
}

そのコンストラクターには、パラメーターとして共有dataQueueがあります。 runFlagtrueに初期化されます。 このフラグは、必要に応じてコンシューマープロセスを停止します。

スレッドが開始すると、consumeメソッドが実行されます

public void consume() {
    while (runFlag) {
        Message message;
        if (dataQueue.isEmpty()) {
            try {
                dataQueue.waitOnEmpty();
            } catch (InterruptedException e) {
                break;
            }
        }
        if (!runFlag) {
            break;
        }
        message = dataQueue.remove();
        dataQueue.notifyAllForFull();
        useMessage(message);
    }
}

whileループが継続的に実行されます。 また、runFlagfalseの場合、このプロセスは正常に停止します。

各反復は、キューが空であるかどうかをチェックします。 キューが空の場合、コンシューマーはメッセージが生成されるのを待ちます。 この待機は、 while ループでも使用され、誤ったウェイクアップを回避します。

コンシューマーが待機から復帰すると、runFlagをチェックします。 フラグがfalseの場合、ループから抜け出します。 それ以外の場合は、キューからメッセージを読み取り、「フルキュー」状態で待機していることをプロデューサーに通知します。 最後に、メッセージを消費します。

プロセスを正常に停止するには、 stop()メソッドを使用します。

public void stop() {
    runFlag = false;
    dataQueue.notifyAllForEmpty();
}

runFlagfalseに設定されると、空のキュー状態で待機しているすべてのコンシューマーに通知されます。 これにより、すべてのコンシューマスレッドが確実に終了します。

3.5. プロデューサースレッドとコンシューマースレッドの実行

必要な最大容量を持つdataQueueオブジェクトを作成しましょう。

DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);

それでは、プロデューサーオブジェクトとスレッドを作成しましょう。

Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);

次に、Consumerオブジェクトとスレッドを初期化します。

Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);

最後に、スレッドを開始してプロセスを開始します。

producerThread.start();
consumerThread.start();

それらのスレッドを停止するまで、継続的に実行されます。 それらを停止するのは簡単です:

producer.stop();
consumer.stop();

3.6. 複数のプロデューサーとコンシューマーを実行する

複数のプロデューサーとコンシューマーを実行することは、単一のプロデューサーとコンシューマーの場合と似ています。 必要な数のスレッドを作成して開始するだけです。

複数のプロデューサーとスレッドを作成して開始しましょう。

Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) {
    Thread producerThread = new Thread(producer);
    producerThread.start();
}

次に、必要な数のコンシューマオブジェクトとスレッドを作成しましょう。

Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
}

プロデューサーオブジェクトとコンシューマーオブジェクトでstop()メソッドを呼び出すことにより、プロセスを適切に停止できます。

producer.stop();
consumer.stop();

4. BlockingQueueを使用した簡略化された例

Javaは、スレッドセーフなBlockingQueueインターフェースを提供します。 つまり、複数のスレッドは、同時実行の問題なしにこのキューに追加したり、このキューから削除したりできます

そのput()メソッドは、キューがいっぱいの場合、呼び出し元のスレッドをブロックします。 同様に、キューが空の場合、その take()メソッドは呼び出し元のスレッドをブロックします。

4.1. 境界付きBlockingQueueを作成します

コンストラクターの容量値を使用して、制限付きBlockingQueueを作成できます。

BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);

4.2. 簡略化されたプロデュースメソッド

produce()メソッドでは、キューの明示的な同期を回避できます。

private void produce() {
    while (true) {
        double value = generateValue();
        try {
            blockingQueue.put(value);
        } catch (InterruptedException e) {
            break;
        }
    }
}

このメソッドは継続的にオブジェクトを生成し、それらをキューに追加するだけです。

4.3. 簡略化された消費メソッド

consume()メソッドは、明示的に同期を使用しません。

private void consume() {
    while (true) {
        Double value;
        try {
            value = blockingQueue.take();
        } catch (InterruptedException e) {
            break;
        }
        // Consume value
    }
}

キューから値を取得し、それを継続的に消費します。

4.4. プロデューサースレッドとコンシューマースレッドを実行する

必要な数のプロデューサースレッドとコンシューマースレッドを作成できます。

for (int i = 0; i < 2; i++) {
    Thread producerThread = new Thread(this::produce);
    producerThread.start();
}

for (int i = 0; i < 3; i++) {
    Thread consumerThread = new Thread(this::consume);
    consumerThread.start();
}

5. 結論

この記事では、Javaスレッドを使用して生産者/消費者問題を実装する方法を学びました。 また、複数のプロデューサーとコンシューマーでシナリオを実行する方法も学びました。

完全なコードサンプルは、GitHubにあります。