1概要

この記事では、標準の

java.utilからの


https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/TransferQueue.html


構造を調べます。 concurrent

パッケージ。

簡単に言うと、このキューによって、プロデューサ – コンシューマパターンに従ってプログラムを作成し、プロデューサからコンシューマに渡されるメッセージを調整することができます。

実装は実際には


BlockingQueue



に似ていますが、バックプレッシャーの形式を実装する新しい機能を提供します。つまり、プロデューサが__transfer()メソッドを使用してコンシューマにメッセージを送信すると、メッセージが消費されるまでプロデューサはブロックされたままになります。


2一人の生産者 – ゼロ消費者


TransferQueue

から

transfer()

メソッドをテストしましょう。予想される動作は、

take()

メソッドを使用してキューからメッセージを受信するまでプロデューサがブロックされることです。

それを達成するために、私たちは1人のプロデューサーがいるが0人のコンシューマーを持つプログラムを作ります。プロデューサスレッドからの__transfer()の最初の呼び出しは、その要素をキューから取得するコンシューマがいないため、無期限にブロックされます。


Producer

クラスがどのように見えるかを見てみましょう。

class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer numberOfMessagesToProduce;

    public AtomicInteger numberOfProducedMessages
      = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToProduce; i++) {
            try {
                boolean added
                  = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
                if(added){
                    numberOfProducedMessages.incrementAndGet();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
   //standard constructors
}


TransferQueue

のインスタンスを、プロデューサに付ける名前とキューに転送する必要がある要素の数と共にコンストラクタに渡します。

指定されたタイムアウトで、

tryTransfer()

メソッドを使用していることに注意してください。

4秒間待っていますが、プロデューサが指定されたタイムアウト時間内にメッセージを転送できない場合、

false

を返して次のメッセージに進みます。プロデューサには

numberOfProducedMessages

変数があり、生成されたメッセージ数を追跡できます。

次に、

Consumer

クラスを見てみましょう。

class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int numberOfMessagesToConsume;

    public AtomicInteger numberOfConsumedMessages
     = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToConsume; i++) {
            try {
                String element = transferQueue.take();
                longProcessing(element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void longProcessing(String element)
      throws InterruptedException {
        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(500);
    }

   //standard constructors
}

これはプロデューサーに似ていますが、

take()

メソッドを使用してキューから要素を受け取ります。また、受信したメッセージのカウンターである

numberOfConsumedMessages

変数をインクリメントしている

longProcessing()

メソッドを使用して、長期にわたるアクションをシミュレートしています。

それでは、1人のプロデューサーだけでプログラムを始めましょう。

@Test
public void whenUseOneProducerAndNoConsumers__thenShouldFailWithTimeout()
  throws InterruptedException {
   //given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);

   //when
    exService.execute(producer);

   //then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}

3つの要素をキューに送りたいのですが、プロデューサは最初の要素でブロックされており、キューからその要素をフェッチするコンシューマはありませんが消費されるか、タイムアウトになります。

タイムアウト後、転送が失敗したことを示すために

false

を返し、次の転送を試みます。これは前の例からの出力です:

Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...


3一人の生産者 – 一人の消費者

1人の生産者と1人の消費者がいるときの状況をテストしましょう。

@Test
public void whenUseOneConsumerAndOneProducer__thenShouldProcessAllMessages()
  throws InterruptedException {
   //given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);
    Consumer consumer = new Consumer(transferQueue, "1", 3);

   //when
    exService.execute(producer);
    exService.execute(consumer);

   //then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 3);
    assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}


TransferQueue

は交換ポイントとして使用され、コンシューマがキューから要素を消費するまで、プロデューサは別の要素を追加することはできません。プログラムの出力を見てみましょう。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2


TransferQueue.

の指定により、キューからの要素の生成と消費はシーケンシャルであることがわかります。


4多くの生産者 – 多くの消費者

最後の例では、複数のコンシューマと複数のプロデューサを持つことを検討します。

@Test
public void whenMultipleConsumersAndProducers__thenProcessAllMessages()
  throws InterruptedException {
   //given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(3);
    Producer producer1 = new Producer(transferQueue, "1", 3);
    Producer producer2 = new Producer(transferQueue, "2", 3);
    Consumer consumer1 = new Consumer(transferQueue, "1", 3);
    Consumer consumer2 = new Consumer(transferQueue, "2", 3);

   //when
    exService.execute(producer1);
    exService.execute(producer2);
    exService.execute(consumer1);
    exService.execute(consumer2);

   //then
    exService.awaitTermination(10__000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
    assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}

この例では、2人の消費者と2人の生産者がいます。プログラムが開始されると、両方のプロデューサが1つの要素を生成できるようになり、その後、いずれかのコンシューマがその要素をキューから取り出すまでブロックします。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2


5結論

この記事では、

java.util.concurrent

パッケージの

TransferQueue

構文を見ていました。

その構文を使って生産者 – 消費者プログラムを実装する方法を見ました。

transfer()

メソッドを使用してバックプレッシャーの形式を作成しました。この場合、コンシューマーがキューから要素を取得するまでプロデューサーは別の要素を公開できません。


TransferQueue

は、キューをメッセージであふれさせ、結果として

OutOfMemory

エラーが発生するような、過剰生産のプロデューサが不要な場合に非常に便利です。このような設計では、プロデューサがメッセージを生成する速度をコンシューマが決定します。

これらの例とコードスニペットはすべてhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubで公開]です。そのままインポートして実行します。