1. 概要

この記事では、標準の java.util.concurrentパッケージからのTransferQueueコンストラクトについて説明します。

簡単に言えば、このキューを使用すると、生産者/消費者パターンに従ってプログラムを作成し、生産者から消費者に渡されるメッセージを調整できます。

実装は実際にはBlockingQueue– に似ていますが、ある種のバックプレッシャを実装する新しい機能を提供します。 つまり、プロデューサーが transfer()メソッドを使用してコンシューマーにメッセージを送信すると、メッセージが消費されるまでプロデューサーはブロックされたままになります。

2. 1つのプロデューサー–ゼロの消費者

TransferQueueからtransfer()メソッドをテストしてみましょう。予想される動作は、コンシューマーが take()を使用してキューからメッセージを受信するまでプロデューサーがブロックされることです。 メソッド。

これを実現するために、プロデューサーは1人、コンシューマーは0人のプログラムを作成します。 プロデューサースレッドからのtransfer()の最初の呼び出しは、キューからその要素をフェッチするコンシューマーがないため、無期限にブロックされます。

プロデューサークラスがどのように見えるか見てみましょう。

class Producer implements Runnable {
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private Integer numberOfMessagesToProduce;
 
    public AtomicInteger numberOfProducedMessages
      = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToProduce; i++) {
            try {
                boolean added 
                  = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
                if(added){
                    numberOfProducedMessages.incrementAndGet();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // standard constructors
}

TransferQueue のインスタンスを、プロデューサーに付ける名前とキューに転送する必要のある要素の数とともにコンストラクターに渡します。

tryTransfer()メソッドを、指定されたタイムアウトで使用していることに注意してください。 4秒間待機しており、プロデューサーが指定されたタイムアウト内にメッセージを転送できない場合、 false を返し、次のメッセージに進みます。 プロデューサーには、生成されたメッセージの数を追跡するためのnumberOfProducedMessages変数があります。

次に、Consumerクラスを見てみましょう。

class Consumer implements Runnable {
 
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private int numberOfMessagesToConsume;
 
    public AtomicInteger numberOfConsumedMessages
     = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToConsume; i++) {
            try {
                String element = transferQueue.take();
                longProcessing(element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void longProcessing(String element)
      throws InterruptedException {
        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(500);
    }
    
    // standard constructors
}

これはプロデューサーに似ていますが、 take()メソッドを使用してキューから要素を受け取っています。 また、受信したメッセージのカウンターである numberOfConsumedMessages 変数をインクリメントする、 longProcessing()メソッドを使用して、長時間実行されるアクションをシミュレートしています。

それでは、1人のプロデューサーだけでプログラムを始めましょう。

@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);

    // when
    exService.execute(producer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}

3つの要素をキューに送信したいのですが、プロデューサーは最初の要素でブロックされており、キューからその要素をフェッチするコンシューマーはありません tryTransfer()[を使用していますX202X] method これは、メッセージが消費されるか、タイムアウトに達するまでブロックします。 タイムアウト後、転送が失敗したことを示す false が返され、次の転送が試行されます。 これは前の例からの出力です:

Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...

3. 1つのプロデューサー–1つのコンシューマー

1つのプロデューサーと1つのコンシューマーがある状況をテストしてみましょう。

@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);
    Consumer consumer = new Consumer(transferQueue, "1", 3);

    // when
    exService.execute(producer);
    exService.execute(consumer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 3);
    assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}

TransferQueue は交換ポイントとして使用され、コンシューマーがキューから要素を消費するまで、プロデューサーは別の要素をキューに追加することを続行できません。 プログラムの出力を見てみましょう。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2

TransferQueue。の仕様により、キューからの要素の生成と消費はシーケンシャルであることがわかります。

4. 多くの生産者–多くの消費者

最後の例では、複数のコンシューマーと複数のプロデューサーを持つことを検討します。

@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(3);
    Producer producer1 = new Producer(transferQueue, "1", 3);
    Producer producer2 = new Producer(transferQueue, "2", 3);
    Consumer consumer1 = new Consumer(transferQueue, "1", 3);
    Consumer consumer2 = new Consumer(transferQueue, "2", 3);

    // when
    exService.execute(producer1);
    exService.execute(producer2);
    exService.execute(consumer1);
    exService.execute(consumer2);

    // then
    exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
    assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}

この例では、2つのコンシューマーと2つのプロデューサーがあります。 プログラムが開始すると、両方のプロデューサーが1つの要素を生成でき、その後、コンシューマーの1つがキューからその要素を取得するまでブロックされます。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2

5. 結論

この記事では、java.util.concurrentパッケージからのTransferQueueコンストラクトについて説明しました。

その構成を使用して生産者/消費者プログラムを実装する方法を見ました。 transfer()メソッドを使用して、バックプレッシャの形式を作成しました。この形式では、コンシューマーがキューから要素を取得するまで、プロデューサーは別の要素を公開できません。

TransferQueue は、キューをメッセージで溢れさせてOutOfMemoryエラーが発生する過剰生産のプロデューサーが必要ない場合に非常に役立ちます。 このような設計では、コンシューマーがプロデューサーがメッセージを生成する速度を決定します。

これらの例とコードスニペットはすべて、GitHubにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。