1概要

この記事では、プロデューサーとコンシューマーのコンカレント問題を解決するための最も有用な構文

java.util.concurrent

の1つを取り上げます。


BlockingQueue


インターフェースのAPIと、そのインターフェースからのメソッドが並行書き込みを行う方法プログラムが簡単になります。

この記事の後半で、複数のプロデューサースレッドと複数のコンシューマースレッドを持つ単純なプログラムの例を示します。


2

BlockingQueue

タイプ

2種類の

BlockingQueue

を区別できます。

  • 無制限キュー – ほぼ無制限に大きくなる可能性があります

  • 制限付きキュー – 最大容量が定義されています


2.1. 無制限キュー

無制限キューを作成するのは簡単です。

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();


blockingQueue

の容量は

Integer.MAX

VALUEに設定されます。無制限キューに要素を追加するすべての操作はブロックされないため、サイズが非常に大きくなることがあります。

無制限のBlockingQueueを使用してプロデューサ – コンシューマプログラムを設計する際の最も重要なことは、プロデューサがキューにメッセージを追加するのと同じくらい早くコンシューマがメッセージを消費できるようにすることです。そうでなければ、メモリがいっぱいになり、

OutOfMemory

例外が発生します。


2.2. バウンドキュー

2番目の種類のキューは、境界キューです。容量を引数としてコンストラクタに渡すことで、このようなキューを作成できます。

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

ここでは、10に等しい容量を持つ

blockingQueue

があります。つまり、追加に使用されたメソッド(

offer()



add()

)に応じて、コンシューマがすでにフルキューに要素を追加しようとした場合です。または

put()

)は、オブジェクトを挿入するためのスペースが使用可能になるまでブロックされます。そうでなければ、操作は失敗します。

すでにフルキューに要素を挿入するとき、コンシューマが追いついてキュー内のスペースを確保するまでその操作を待つ必要があるため、境界キューを使用することは並行プログラムを設計するのに適した方法です。それは私達の側に何の努力もせずに私達に調整を与える。


3

BlockingQueue

API


BlockingQueue

interfaceには、キューへの要素の追加を担当するメソッドと、それらの要素を取得するメソッドの2種類があります。これら2つのグループからの各メソッドは、キューがいっぱい/空の場合は異なる動作をします。


3.1. 要素を追加する


  • add() –

    は、挿入が成功した場合は

    true

    を返し、それ以外の場合はスローします。


IllegalStateException

**

put()–

は、指定された要素をキューに挿入します。

必要に応じて空きスロット
**

offer()–

は、挿入が成功した場合は

true

を返し、それ以外の場合は

true

を返します。


虚偽

**

offer(E e、長いタイムアウト、TimeUnit単位)–

は要素を挿入しようとします

キューに入れ、指定されたタイムアウト内に使用可能なスロットを待つ


3.2. 要素を取得する


  • take()

    – キューの先頭要素を待ち、それを削除します。あれば

キューが空の場合、ブロックされ、要素が使用可能になるのを待ちます。
**

poll(長いタイムアウト、TimeUnit単位)–

はヘッドを取得して削除します。

要素が使用可能になるのに必要な場合は、指定された待機時間まで待機します。タイムアウト後に

null

を返します____

これらのメソッドは、プロデューサー/コンシューマープログラムを構築する際の

BlockingQueue

インターフェースからの最も重要な構成要素です。


4マルチスレッドプロデューサ/コンシューマの例

プロデューサーとコンシューマーの2つの部分からなるプログラムを作成しましょう。

プロデューサーは0から100までの乱数を生成し、その数を

BlockingQueue

に入れます。 4つのプロデューサースレッドがあり、

put()

メソッドを使用して、キューに空きができるまでブロックします。

覚えておくべき重要なことは、要素が待ち行列に現れるのを無期限に待つことから消費者スレッドを停止する必要があることです。

プロデューサからコンシューマに、処理するメッセージがもうないことを知らせるための良い方法は、ポイズンピルと呼ばれる特別なメッセージを送信することです。私たちは消費者と同じ数の毒薬を送る必要があります。その後、消費者がその特別な毒の丸薬メッセージをキューから取り出すと、実行は正常に終了します。

プロデューサークラスを見てみましょう。

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

私たちのプロデューサーコンストラクターは、プロデューサーとコンシューマー間の処理を調整するために使用される

BlockingQueue

を引数として取ります。

メソッド

generateNumbers()

が100個の要素をキューに入れることがわかります。

実行が終了するときにどのタイプのメッセージがキューに入れられるのかを知るためには、poison pillメッセージも必要です。そのメッセージは

poisonPillPerProducer

回キューに入れる必要があります。

各コンシューマは

take()メソッドを使用して

BlockingQueue

から要素を取得するため、キューに要素が存在するまでブロックされます。キューから

Integer__を取り出した後、メッセージがポイズンピルであるかどうかをチェックし、そうであればスレッドの実行は終了します。それ以外の場合は、結果を現在のスレッドの名前とともに標準出力に出力します。

これは私達に私達の消費者の内部の働きへの洞察を与えるでしょう:

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

注意すべき重要なことはキューの使用法です。プロデューサーコンストラクターと同じように、キューは引数として渡されます。明示的な同期なしで

BlockingQueue

をスレッド間で共有できるため、これを実行できます。

生産者と消費者が揃ったので、プログラムを開始できます。キューの容量を定義する必要があり、それを100要素に設定します。

4つのプロデューサスレッドを使用したいと考えています。コンシューマスレッドの数は、使用可能なプロセッサの数と同じになります。

int BOUND = 10;
int N__PRODUCERS = 4;
int N__CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX__VALUE;
int poisonPillPerProducer = N__CONSUMERS/N__PRODUCERS;
int mod = N__CONSUMERS % N__PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 1; i < N__PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N__CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();


BlockingQueue

は、容量を持つコンストラクトを使用して作成されます。私たちは4人の生産者とN人の消費者を作り出しています。ポイズンピルメッセージは

Integer.MAX

VALUE

になるように指定されています。そのような値は通常の作業条件下ではプロデューサーによって送信されないためです。ここで注意すべき最も重要なことは、

BlockingQueue__がそれらの間の作業を調整するために使用されるということです。

プログラムを実行すると、4つのプロデューサースレッドがランダムな

Integers



BlockingQueue

に入れ、コンシューマーはそれらの要素をキューから取り出します。各スレッドは結果と共にスレッド名を標準出力に表示します。


5結論

この記事では、

BlockingQueue

の実用的な使い方を説明し、そこから要素を追加および取得するために使用されるメソッドについて説明します。また、

BlockingQueue

を使用してプロデューサーとコンシューマーの間の作業を調整するマルチスレッドのプロデューサー/コンシューマープログラムを作成する方法も示しました。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。そのため、インポートしてそのまま実行するのは簡単なはずです。