1概要

この記事では、

java.util.concurrent

パッケージに含まれる


SynchronousQueue


について説明します。 。

簡単に言えば、この実装により、スレッドセーフな方法でスレッド間で情報を交換できます。


2 APIの概要


SynchronousQueue

は、**

take()

と__put()の2つのサポートされた操作のみを持ちます。

たとえば、キュ​​ーに要素を追加したい場合は、

put()

メソッドを呼び出す必要があります。そのメソッドは、他のスレッドが__take()メソッドを呼び出すまでブロックし、要素を受け取る準備ができたことを知らせます。


SynchronousQueue

はキューのインターフェースを持っていますが、それを2つのスレッド間の単一要素の交換ポイントとして考える必要があります。1つのスレッドが要素を引き渡し、別のスレッドがその要素を引き継ぎます。


3シェア変数を使用したハンドオフの実装


SynchronousQueue

が非常に便利な理由を確認するために、2つのスレッド間で共有変数を使用してロジックを実装し、次に

SynchronousQueue

を使用してそのロジックを書き換えて、コードをよりシンプルで読みやすくします。

プロデューサとコンシューマの2つのスレッドがあり、プロデューサがシェア変数の値を設定しているときに、そのことをコンシューマスレッドに通知したいとします。次に、コンシューマスレッドは共有変数から値を取得します。

これら2つのスレッドを調整するために

CountDownLatch

を使用して、コンシューマーがまだ設定されていない共有変数の値にアクセスするときの状況を回避します。

処理を調整するために使用される

sharedState

変数と

CountDownLatch

を定義します。

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

プロデューサはランダムな整数を

sharedState

変数に保存し、

countDownLatchに対して

countDown()メソッドを実行し、__sharedStateから値を取得できることをコンシューマに通知します。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

コンシューマは

await()

メソッドを使用して

countDownLatch

を待機します。プロデューサが変数が設定されたことを通知すると、コンシューマは__sharedStateからそれを取得します。

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

大事なことを言い忘れましたが、プログラムを開始しましょう。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

次のような出力が生成されます。

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

これは、2つのスレッド間で要素を交換するなどの単純な機能を実装するための多くのコードであることがわかります。次のセクションでは、それをもっと良くしようとします。


4

SynchronousQueue


を使用したハンドオフの実装

それでは、前のセクションと同じ機能を

SynchronousQueueを使って実装しましょう。これは、スレッド間で状態を交換し、そのアクションを調整するために使用できるので、

SynchronousQueue以外には何も必要ありません。 .__

まず、キューを定義します。

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

プロデューサは

put()

メソッドを呼び出し、他のスレッドがキューから要素を取得するまでブロックします。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

コンシューマは、

take()

メソッドを使用してその要素を単純に取得します。

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

次に、プログラムを開始します。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

次のような出力が生成されます。

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point


SynchronousQueue

がスレッド間の交換ポイントとして使用されていることがわかります。これは、

CountDownLatch.

と共に共有状態を使用した前の例よりもはるかに良く理解しやすいものです。


5結論

このクイックチュートリアルでは、

SynchronousQueue

コンストラクトを調べました。

共有状態を使用して2つのスレッド間でデータを交換するプログラムを作成し、そのプログラムを書き直して

SynchronousQueue

構文を活用しました。これはプロデューサとコンシューマスレッドを調整する交換点として機能します。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。インポートしてそのまま実行するのは簡単なはずです。