1. 概要

この記事では、 java.util.concurrentパッケージのSynchronousQueueについて説明します。

簡単に言えば、この実装により、スレッドセーフな方法でスレッド間で情報を交換できます。

2. APIの概要

SynchronousQueue には、 2つのサポートされている操作(take()とput())のみがあり、両方ともブロッキングです。

たとえば、キューに要素を追加する場合は、 put()メソッドを呼び出す必要があります。 そのメソッドは、他のスレッドが take()メソッドを呼び出して、要素を取得する準備ができていることを通知するまでブロックします。

SynchronousQueue にはキューのインターフェイスがありますが、1つのスレッドが要素を渡し、別のスレッドがその要素を取得する、2つのスレッド間の単一要素の交換ポイントと考える必要があります。 。

3. 共有変数を使用したハンドオフの実装

SynchronousQueue が非常に役立つ理由を確認するために、2つのスレッド間で共有変数を使用してロジックを実装し、次に SynchronousQueue を使用してそのロジックを書き直し、コードを大幅に簡素化します。そしてより読みやすい。

プロデューサーとコンシューマーの2つのスレッドがあり、プロデューサーが共有変数の値を設定しているときに、その事実をコンシューマースレッドに通知するとします。 次に、コンシューマースレッドは共有変数から値をフェッチします。

CountDownLatch を使用して、これら2つのスレッドを調整し、コンシューマーがまだ設定されていない共有変数の値にアクセスする状況を防ぎます。

処理の調整に使用されるsharedState変数とCountDownLatchを定義します。

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

プロデューサーはランダムな整数をsharedState変数に保存し、 countDownLatchでcountDown()メソッドを実行し、コンシューマーにフェッチできることを通知します。 sharedStateからの値:

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

コンシューマーは、 await()メソッドを使用してcountDownLatchを待機します。 プロデューサーが変数が設定されたことを通知すると、コンシューマーはそれを sharedState:からフェッチします。

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

最後になりましたが、プログラムを開始しましょう。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

次の出力が生成されます。

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

これは、2つのスレッド間で要素を交換するなどの単純な機能を実装するための多くのコードであることがわかります。 次のセクションでは、それを改善しようとします。

4. SynchronousQueueを使用したハンドオフの実装

前のセクションと同じ機能を実装しましょう。ただし、 SynchronousQueueを使用します。スレッド間で状態を交換したり、そのアクションを調整したりするために使用できるため、二重の効果があります。 SynchronousQueue以外のものを使用します。

まず、キューを定義します。

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

プロデューサーはput()メソッドを呼び出します。このメソッドは、他のスレッドがキューから要素を取得するまでブロックします。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

コンシューマーは、 take()メソッドを使用してその要素を取得するだけです。

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

次に、プログラムを開始します。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

次の出力が生成されます。

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

SynchronousQueue がスレッド間の交換ポイントとして使用されていることがわかります。これは、CountDownLatch。と一緒に共有状態を使用した前の例よりもはるかに優れていて理解しやすいです。

5. 結論

このクイックチュートリアルでは、SynchronousQueue構造を確認しました。 共有状態を使用して2つのスレッド間でデータを交換するプログラムを作成し、そのプログラムを書き直してSynchronousQueue構成を活用しました。 これは、プロデューサースレッドとコンシューマースレッドを調整する交換ポイントとして機能します。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。