DelayQueueのガイド
1概要
この記事では、
java.util.concurrent
の
DelayQueue
構造について説明します。パッケージ。これはプロデューサー/コンシューマープログラムで使用できるブロッキングキューです。
それは非常に有用な特性を持っています – ** 消費者がキューから要素を取りたがっているとき、彼らはその特定の要素のための遅延が期限が切れたときにだけそれを取ることができます。
2
DelayQueue
の要素に
Delayed
を実装する
DelayQueue
に入れたい各要素は
Delayed
インタフェースを実装する必要があります。
DelayObject
クラスを作成したいとしましょう。
そのクラスのインスタンスは__DelayQueueに入れられます。
コンストラクタに
String
dataと
delayInMilliseconds
と引数を渡します。
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
startTimeを定義しています。これは、要素がキューから消費される時間です。次に、
getDelay()__メソッドを実装する必要があります。このオブジェクトに関連付けられた残りの遅延を、指定された時間単位で返します。
したがって、残りの遅延を適切な
TimeUnitに返すには、
TimeUnit.convert()__メソッドを使用する必要があります。
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
コンシューマがキューから要素を取り出そうとすると、
DelayQueue
は
getDelay()
を実行して、その要素をキューから返すことが許可されているかどうかを調べます。
getDelay()
メソッドがゼロまたは負の数を返す場合、それはキューから取得できることを意味します。
DelayQueue
内の要素は有効期限に従ってソートされるため、
compareTo()
メソッドも実装する必要があります。最初に期限切れになる項目はキューの先頭に保持され、有効期限が最も長い要素はキューの末尾に保持されます。
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
3
__DelayQueue C
__onsumerとプロデューサー
DelayQueue
をテストできるようにするには、プロデューサーロジックとコンシューマーロジックを実装する必要があります。プロデューサクラスは、キュー、生成する要素数、および各メッセージの遅延(ミリ秒)を引数として取ります。
それから
run()
メソッドが呼び出されると、それは要素をキューに入れ、それぞれの書き込みの後500ミリ秒間スリープします
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
//standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
-
コンシューマ実装** は非常に似ていますが、消費されたメッセージ数も追跡します。
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
//standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4
DelayQueue
使用状況テスト
__DelayQueueの動作をテストするには、1つのプロデューサスレッドと1つのコンシューマスレッドを作成します。
プロデューサは、2つのオブジェクトを500ミリ秒の遅延でキューに
put()
します。テストは、コンシューマが2つのメッセージを消費したことを表明します。
@Test
public void givenDelayQueue__whenProduceElement
__thenShouldConsumeAfterGivenDelay() throws InterruptedException {
//given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
//when
executor.submit(producer);
executor.submit(consumer);
//then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
このプログラムを実行すると、次のような出力が生成されることがわかります。
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
プロデューサはオブジェクトを置き、しばらくすると遅延が期限切れになった最初のオブジェクトが消費されます。
2番目の要素についても同じ状況が発生しました。
5与えられた時間内に消費できない消費者
たとえば、10秒以内に期限切れになる要素を生産しているプロデューサーがいるとしましょう。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10__000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
テストを開始しますが、5秒後に終了します。
DelayQueueの特性により、
要素はまだ期限切れになっていないため、__消費者はキューからメッセージを消費することはできません。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
消費者の
numberOfConsumedElements
はゼロに等しい値を持つことに注意してください。
6. すぐに有効期限が切れる要素を作成する
Delayed
message
getDelay()
メソッドの実装が負の数を返す場合、それは与えられた要素がすでに期限切れになっていることを意味します。この場合、プロデューサはその要素をすぐに消費します。
負の遅れで要素を生成する状況をテストすることができます。
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10__000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
テストケースを開始すると、すでに期限切れになっているため、コンシューマは要素をすぐに消費します。
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. 結論
この記事では、
java.util.concurrent
パッケージの
DelayQueue
構文を見ていました。
キューから生成され消費される
Delayed
要素を実装しました。
期限切れになった要素を消費するために
DelayQueue
の実装を活用しました。
これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。インポートしてそのまま実行するのは簡単なはずです。