1概要

この記事では、

java.util.concurrent




DelayQueue


構造について説明します。パッケージ。これはプロデューサー/コンシューマープログラムで使用できるブロッキングキューです。

それは非常に有用な特性を持っています – ** 消費者がキューから要素を取りたがっているとき、彼らはその特定の要素のための遅延が期限が切れたときにだけそれを取ることができます。


2

DelayQueue


の要素に

Delayed

を実装する


DelayQueue

に入れたい各要素は


Delayed


インタフェースを実装する必要があります。

DelayObject

クラスを作成したいとしましょう。

そのクラスのインスタンスは__DelayQueueに入れられます。

コンストラクタに

String

dataと

delayInMilliseconds

と引数を渡します。

public class DelayObject implements Delayed {
    private String data;
    private long startTime;

    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }


startTimeを定義しています。これは、要素がキューから消費される時間です。次に、

getDelay()__メソッドを実装する必要があります。このオブジェクトに関連付けられた残りの遅延を、指定された時間単位で返します。

したがって、残りの遅延を適切な

TimeUnitに返すには、

TimeUnit.convert()__メソッドを使用する必要があります。

@Override
public long getDelay(TimeUnit unit) {
    long diff = startTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
}

コンシューマがキューから要素を取り出そうとすると、

DelayQueue



getDelay()

を実行して、その要素をキューから返すことが許可されているかどうかを調べます。

getDelay()

メソッドがゼロまたは負の数を返す場合、それはキューから取得できることを意味します。


DelayQueue

内の要素は有効期限に従ってソートされるため、

compareTo()

メソッドも実装する必要があります。最初に期限切れになる項目はキューの先頭に保持され、有効期限が最も長い要素はキューの末尾に保持されます。

@Override
public int compareTo(Delayed o) {
    return Ints.saturatedCast(
      this.startTime - ((DelayObject) o).startTime);
}


3

__DelayQueue C

__onsumerとプロデューサー


DelayQueue

をテストできるようにするには、プロデューサーロジックとコンシューマーロジックを実装する必要があります。プロデューサクラスは、キュー、生成する要素数、および各メッセージの遅延(ミリ秒)を引数として取ります。

それから

run()

メソッドが呼び出されると、それは要素をキューに入れ、それぞれの書き込みの後500ミリ秒間スリープします

public class DelayQueueProducer implements Runnable {

    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;

   //standard constructor

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object
              = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

  • コンシューマ実装** は非常に似ていますが、消費されたメッセージ数も追跡します。

public class DelayQueueConsumer implements Runnable {
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();

   //standard constructors

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


4

DelayQueue

使用状況テスト

__DelayQueueの動作をテストするには、1つのプロデューサスレッドと1つのコンシューマスレッドを作成します。

プロデューサは、2つのオブジェクトを500ミリ秒の遅延でキューに

put()

します。テストは、コンシューマが2つのメッセージを消費したことを表明します。

@Test
public void givenDelayQueue__whenProduceElement
  __thenShouldConsumeAfterGivenDelay() throws InterruptedException {
   //given
    ExecutorService executor = Executors.newFixedThreadPool(2);

    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(
      queue, numberOfElementsToProduce);
    DelayQueueProducer producer = new DelayQueueProducer(
      queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

   //when
    executor.submit(producer);
    executor.submit(consumer);

   //then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();

    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}

このプログラムを実行すると、次のような出力が生成されることがわかります。

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

プロデューサはオブジェクトを置き、しばらくすると遅延が期限切れになった最初のオブジェクトが消費されます。

2番目の要素についても同じ状況が発生しました。


5与えられた時間内に消費できない消費者

たとえば、10秒以内に期限切れになる要素を生産しているプロデューサーがいるとしましょう。

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10__000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
  queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

テストを開始しますが、5秒後に終了します。

DelayQueueの特性により、

要素はまだ期限切れになっていないため、__消費者はキューからメッセージを消費することはできません。

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);

消費者の

numberOfConsumedElements

はゼロに等しい値を持つことに注意してください。


6. すぐに有効期限が切れる要素を作成する


Delayed

message

getDelay()

メソッドの実装が負の数を返す場合、それは与えられた要素がすでに期限切れになっていることを意味します。この場合、プロデューサはその要素をすぐに消費します。

負の遅れで要素を生成する状況をテストすることができます。

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10__000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

テストケースを開始すると、すでに期限切れになっているため、コンシューマは要素をすぐに消費します。

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);


7. 結論

この記事では、

java.util.concurrent

パッケージの

DelayQueue

構文を見ていました。

キューから生成され消費される

Delayed

要素を実装しました。

期限切れになった要素を消費するために

DelayQueue

の実装を活用しました。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。インポートしてそのまま実行するのは簡単なはずです。