1. 概要

この記事では、同時生産者/消費者問題を解決するために最も有用な構成の1つjava.util.concurrentを見ていきます。 BlockingQueue インターフェースのAPIと、そのインターフェースのメソッドが並行プログラムの作成をどのように容易にするかを見ていきます。

この記事の後半では、複数のプロデューサースレッドと複数のコンシューマースレッドを持つ単純なプログラムの例を示します。

2. BlockingQueueタイプ

BlockingQueueには次の2つのタイプがあります。

  • 無制限のキュー–ほぼ無期限に拡張できます
  • 制限付きキュー–最大容量が定義されています

2.1. 無制限のキュー

無制限のキューの作成は簡単です。

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

blockingQueueの容量Integer.MAX_VALUEに設定されます。無制限のキューに要素を追加するすべての操作はブロックされないため、非常に大きなサイズになる可能性があります。

無制限のBlockingQueueを使用してプロデューサー/コンシューマープログラムを設計する場合に最も重要なことは、プロデューサーがキューにメッセージを追加するのと同じ速さでコンシューマーがメッセージを消費できるようにすることです。 そうしないと、メモリがいっぱいになり、OutOfMemory例外が発生する可能性があります。

2.2. 制限付きキュー

2番目のタイプのキューは、制限付きキューです。 コンストラクターに引数として容量を渡すことにより、このようなキューを作成できます。

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

ここに、容量が10に等しいblockingQueueがあります。 これは、プロデューサーが要素を追加するために使用されたメソッド( offer() add()、またはに応じて、既に満杯のキューに要素を追加しようとした場合を意味します。 ] put())、オブジェクトを挿入するためのスペースが利用可能になるまでブロックします。 そうしないと、操作が失敗します。

すでに満杯のキューに要素を挿入する場合、その操作は、コンシューマーが追いつき、キューで使用可能なスペースを確保するまで待機する必要があるため、制限付きキューを使用することは、同時プログラムを設計するための良い方法です。 それは私たちの側で何の努力もせずに私たちにスロットルを与えます。

3. BlockingQueue API

BlockingQueue interface キューへの要素の追加を担当するメソッドと、それらの要素を取得するメソッドには、2つのタイプのメソッドがあります。 キューがいっぱい/空の場合、これら2つのグループの各メソッドの動作は異なります。

3.1. 要素の追加

  • add()– は、挿入が成功した場合は true を返し、そうでない場合はIllegalStateExceptionをスローします。
  • put()– は指定された要素をキューに挿入し、必要に応じて空きスロットを待ちます
  • offer()– は、挿入が成功した場合は true を返し、それ以外の場合はfalseを返します。
  • offer(E e、長いタイムアウト、TimeUnitユニット)– キューに要素を挿入しようとし、指定されたタイムアウト内に使用可能なスロットを待機します

3.2. 要素の取得

  • take() –キューのヘッド要素を待機して削除します。 キューが空の場合、キューはブロックされ、要素が使用可能になるのを待ちます
  • poll(long timeout、TimeUnit unit)– はキューの先頭を取得して削除し、要素が使用可能になるまで必要に応じて指定された待機時間まで待機します。 タイムアウト後にnullを返します

これらのメソッドは、プロデューサー-コンシューマープログラムを構築する際のBlockingQueueインターフェイスからの最も重要なビルディングブロックです。

4. マルチスレッドの生産者/消費者の例

プロデューサーとコンシューマーの2つの部分で構成されるプログラムを作成しましょう。

プロデューサーは0から100までの乱数を生成し、その数をBlockingQueueに入れます。 4つのプロデューサースレッドがあり、 put()メソッドを使用して、キューに使用可能なスペースができるまでブロックします。

覚えておくべき重要なことは、消費者スレッドが要素がキューに無期限に現れるのを待つのを止める必要があるということです。

処理するメッセージがこれ以上ないことをプロデューサーからコンシューマーに通知するための優れた手法は、ポイズンピルと呼ばれる特別なメッセージを送信することです。 消費者と同じ数の毒薬を送る必要があります。 次に、消費者がその特別な毒薬メッセージをキューから受け取ると、正常に実行が終了します。

プロデューサークラスを見てみましょう:

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
    
    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

プロデューサーコンストラクターは、プロデューサーとコンシューマーの間の処理を調整するために使用されるBlockingQueueを引数として取ります。 メソッドgenerateNumbers()が100個の要素をキューに入れることがわかります。 実行が終了するときにどのタイプのメッセージをキューに入れる必要があるかを知るには、ポイズンピルメッセージも必要です。 そのメッセージは、poisonPillPerProducer回キューに入れる必要があります。

各コンシューマーは、 take()メソッドを使用して BlockingQueue から要素を取得するため、キューに要素が存在するまでブロックされます。 キューから整数を取得した後、メッセージがポイズンピルであるかどうかを確認します。そうである場合は、スレッドの実行が終了します。 それ以外の場合は、現在のスレッド名とともに結果を標準出力に出力します。

これにより、消費者の内部の仕組みについての洞察が得られます。

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;
    
    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

注意すべき重要なことは、キューの使用法です。 プロデューサーコンストラクターと同様に、キューは引数として渡されます。 BlockingQueue は、明示的な同期なしでスレッド間で共有できるため、これを行うことができます。

これで、プロデューサーとコンシューマーができたので、プログラムを開始できます。 キューの容量を定義する必要があり、100要素に設定します。

4つのプロデューサースレッドが必要であり、コンシューマースレッドの数は使用可能なプロセッサーの数と同じになります。

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();

BlockingQueue は、容量のある構成を使用して作成されます。 4つのプロデューサーとNのコンシューマーを作成しています。 ポイズンピルメッセージをInteger.MAX_VALUEに指定します。これは、このような値が通常の作業条件下でプロデューサーによって送信されることはないためです。 ここで注意すべき最も重要なことは、BlockingQueueがそれらの間の作業を調整するために使用されることです。

プログラムを実行すると、4つのプロデューサースレッドがランダムな整数 BlockingQueue に配置し、コンシューマーがそれらの要素をキューから取得します。 各スレッドは、結果とともにスレッドの名前を標準出力に出力します。

5. 結論

この記事では、 BlockingQueue の実際の使用法を示し、そこから要素を追加および取得するために使用されるメソッドについて説明します。 また、 BlockingQueue を使用してマルチスレッドの生産者/消費者プログラムを構築し、生産者と消費者の間の作業を調整する方法も示しました。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenベースのプロジェクトであるため、そのままインポートして実行するのは簡単です。