1. 序章

この記事では、 PriorityBlockingQueue クラスに焦点を当て、いくつかの実用的な例について説明します。

キューが何であるかをすでに知っているという仮定から始めて、最初にPriorityBlockingQueue内の要素が優先度によってどのように順序付けられるかを示します。

これに続いて、このタイプのキューを使用してスレッドをブロックする方法を示します。

最後に、これら2つの機能を一緒に使用すると、複数のスレッド間でデータを処理するときにどのように役立つかを示します。

2. 要素の優先順位

標準のキューとは異なり、任意のタイプの要素をに追加することはできません。 PriorityBlockingQueue。 2つのオプションがあります。

  1. Compareableを実装する要素の追加
  2. コンパレータも提供することを条件に、Compareableを実装しない要素を追加する

コンパレータまたはComparable実装のいずれかを使用して要素を比較することにより、PriorityBlockingQueueは常にソートされます。

目的は、最も優先度の高い要素が常に最初に順序付けられる方法で比較ロジックを実装することです。 次に、キューから要素を削除すると、常に最も優先度の高い要素になります。

まず、複数のスレッドでキューを使用するのではなく、単一のスレッドでキューを使用してみましょう。 これを行うことで、単体テストで要素がどのように順序付けられているかを簡単に証明できます。

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
 
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);

queue.drainTo(polledElements);

assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

ご覧のとおり、要素をランダムな順序でキューに追加しているにもかかわらず、ポーリングを開始すると要素は順序付けられます。 これは、 IntegerクラスがComparable、を実装しているためです。これは、昇順でキューから確実に取り出すために使用されます。

また、 2つの要素が比較されて同じである場合、それらがどのように順序付けられるかについての保証はありません。

3. キューを使用してブロックする

標準キューを処理している場合は、 poll()を呼び出して要素を取得します。 ただし、キューが空の場合、 poll()を呼び出すとnullが返されます。

PriorityBlockingQueue は、 BlockingQueue インターフェイスを実装します。これにより、空のキューから削除するときにブロックできるいくつかの追加メソッドが提供されます。 take()メソッドを使用してみましょう。これにより、次のことが正確に実行されます。

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

new Thread(() -> {
  System.out.println("Polling...");

  try {
      Integer poll = queue.take();
      System.out.println("Polled: " + poll);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
}).start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);

sleep()を使用することは、物事を示すためのやや脆弱な方法ですが、このコードを実行すると、次のようになります。

Polling...
Adding to queue
Polled: 1

これは、アイテムが追加されるまで take()がブロックされたことを証明します。

  1. スレッドは「ポーリング」を出力して、開始されたことを証明します
  2. その後、テストは約5秒間一時停止し、スレッドがこの時点で take()を呼び出している必要があることを証明します。
  3. キューに追加すると、 take()が要素を利用可能になるとすぐに返したことを証明するために、多かれ少なかれ即座に「Polled:1」が表示されます。

BlockingQueue インターフェースは、フルキューに追加するときにブロックする方法も提供することにも言及する価値があります。

ただし、PriorityBlockingQueueには制限がありません。 これは、それがいっぱいになることは決してないことを意味します。したがって、新しい要素をいつでも追加できます。

4. ブロッキングと優先順位付けを一緒に使用する

PriorityBlockingQueueの2つの重要な概念について説明したので、両方を一緒に使用してみましょう。 前の例を単純に拡張できますが、今回はキューに要素を追加します。

Thread thread = new Thread(() -> {
    System.out.println("Polling...");
    while (true) {
        try {
            Integer poll = queue.take();
            System.out.println("Polled: " + poll);
        } 
        catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }
});

thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");

queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));

繰り返しになりますが、これは sleep()を使用しているため少し脆弱ですが、は依然として有効なユースケースを示しています。 これで、要素が追加されるのを待ってブロックするキューができました。 次に、一度に多くの要素を追加し、それらが優先順に処理されることを示します。 出力は次のようになります。

Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7

5. 結論

このガイドでは、 PriorityBlockingQueue を使用して、いくつかのアイテムが追加されるまでスレッドをブロックする方法と、それらのアイテムを優先度に基づいて処理できることを示しました。

これらの例の実装は、GitHubにあります。 これはMavenベースのプロジェクトであるため、そのまま実行するのは簡単です。