1. 概要

このチュートリアルでは、 JCTools (Java同時実行ツール)ライブラリを紹介します。

簡単に言えば、これにより、マルチスレッド環境での作業に適した多数のユーティリティデータ構造が提供されます。

2. 非ブロッキングアルゴリズム

従来、可変共有状態で動作するマルチスレッドコードは、ロックを使用して、データの一貫性と公開(1つのスレッドによって行われた変更で別のスレッドに表示される)を保証します。

このアプローチにはいくつかの欠点があります。

  • ロックを取得しようとしてスレッドがブロックされ、別のスレッドの操作が終了するまで進行しない場合があります。これにより、並列処理が効果的に防止されます。
  • ロックの競合が激しいほど、JVMがスレッドのスケジューリング、待機中のスレッドの競合とキューの管理に費やす時間が長くなり、実際の作業が少なくなります。
  • 複数のロックが関係していて、それらが間違った順序で取得/解放された場合、デッドロックが発生する可能性があります
  • 優先順位の逆転の危険が発生する可能性があります–優先度の低いスレッドがロックを保持しようとして、優先度の高いスレッドがロックされます
  • ほとんどの場合、粗粒度のロックが使用され、並列処理が大きく損なわれます。細粒度のロックには、より注意深い設計が必要であり、ロックのオーバーヘッドが増加し、エラーが発生しやすくなります。

別の方法は、 非ブロッキングアルゴリズム、すなわち スレッドの障害または一時停止が別のスレッドの障害または一時停止を引き起こさないアルゴリズム

関与するスレッドの少なくとも1つが任意の期間にわたって進行することが保証されている場合、非ブロッキングアルゴリズムはロックフリーです。 処理中にデッドロックが発生することはありません。

さらに、スレッドごとの進行が保証されている場合、これらのアルゴリズムは待機なしです。

これは、優れた Java Concurrency inPractice本のノンブロッキングStackの例です。 基本状態を定義します。

public class ConcurrentStack<E> {

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    private static class Node <E> {
        public E item;
        public Node<E> next;

        // standard constructor
    }
}

また、いくつかのAPIメソッド:

public void push(E item){
    Node<E> newHead = new Node<E>(item);
    Node<E> oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

アルゴリズムはきめ細かいコンペアアンドスワップ( CAS )命令を使用し、ロックフリー(複数のスレッドが top.compareAndSet()を呼び出した場合でも)であることがわかります。 同時に、そのうちの1つは成功することが保証されています)が、CASが特定のスレッドで最終的に成功する保証はないため待機なしではありません。

3. 依存

まず、JCToolsの依存関係をpom.xmlに追加しましょう。

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>2.1.2</version>
</dependency>

利用可能な最新バージョンはMavenCentralで利用可能であることに注意してください。

4. JCToolsキュー

ライブラリは、マルチスレッド環境で使用するための多数のキューを提供します。 1つ以上のスレッドがキューに書き込み、1つ以上のスレッドがスレッドセーフなロックフリーの方法でキューから読み取ります。

すべてのQueue実装に共通のインターフェースは、org.jctools.queues.MessagePassingQueueです。

4.1. キューの種類

すべてのキューは、プロデューサー/コンシューマーポリシーに基づいて分類できます。

  • 単一のプロデューサー、単一のコンシューマー– このようなクラスは、接頭辞Spscを使用して名前が付けられます。 SpscArrayQueue
  • 単一のプロデューサー、複数のコンシューマー–Spmcプレフィックスを使用します。  SpmcArrayQueue
  • 複数のプロデューサー、単一のコンシューマー–Mpsc プレフィックスを使用します。例: MpscArrayQueue
  • 複数のプロデューサー、複数のコンシューマー–Mpmc プレフィックスを使用します。例:  MpmcArrayQueue

注意することが重要です内部的にはポリシーチェックはありません。 誤った使用法の場合、キューはサイレントに誤動作する可能性があります

例えば 以下のテストは、2つのスレッドからシングルプロデューサーキューにデータを入力し、コンシューマーが異なるプロデューサーからのデータを表示することが保証されていない場合でも合格します。

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. キューの実装

上記の分類を要約すると、JCToolsキューのリストは次のとおりです。

  • SpscArrayQueue – 単一のプロデューサー、単一のコンシューマー、内部でアレイを使用、容量を制限
  • SpscLinkedQueue – 単一のプロデューサー、単一のコンシューマー、内部でリンクリストを使用、バインドされていない容量
  • SpscChunkedArrayQueue – 単一のプロデューサー、単一のコンシューマー、初期容量から開始し、最大容量まで拡張します
  • SpscGrowableArrayQueue – 単一のプロデューサー、単一のコンシューマーは、初期容量から始まり、最大容量まで成長します。 これはSpscChunkedArrayQueueと同じコントラクトですが、唯一の違いは内部チャンク管理です。 実装が簡略化されているため、SpscChunkedArrayQueueを使用することをお勧めします
  • SpscUnboundedArrayQueue – 単一のプロデューサー、単一のコンシューマー、内部でアレイを使用、バインドされていない容量
  • SpmcArrayQueue – 単一のプロデューサー、複数のコンシューマー、内部でアレイを使用、容量を制限
  • MpscArrayQueue – 複数のプロデューサー、単一のコンシューマー、内部でアレイを使用、容量を制限
  • MpscLinkedQueue – 複数のプロデューサー、単一のコンシューマー、内部でリンクリストを使用、無制限の容量
  • MpmcArrayQueue – 複数のプロデューサー、複数のコンシューマー、内部で配列を使用、容量を制限

4.3. アトミックキュー

前のセクションで説明したすべてのキューは、sun.misc.Unsafeを使用します。 ただし、Java9とJEP-260 の登場により、このAPIにはデフォルトでアクセスできなくなります。

そのため、sun.misc.Unsafeの代わりにjava.util.concurrent.atomic.AtomicLongFieldUpdater(パブリックAPI、パフォーマンスが低い)を使用する代替キューがあります。

それらは上記のキューから生成され、それらの名前にはAtomicという単語が間に挿入されます。 SpscChunkedAtomicArrayQueueまたはMpmcAtomicArrayQueue

可能であれば「通常の」キューを使用し、HotSpotJava9+やJRockitなどのsun.misc.Unsafeが禁止/無効な環境でのみAtomicQueuesを使用することをお勧めします。

4.4. 容量

すべてのJCToolsキューにも最大容量があるか、バインドされていない可能性があります。 キューがいっぱいになり、容量に制限されると、新しい要素の受け入れを停止します。

次の例では、次のようにします。

  • キューを埋める
  • その後、新しい要素の受け入れを停止するようにしてください
  • それから排出し、後で要素を追加できることを確認します

読みやすくするために、いくつかのコードステートメントが削除されていることに注意してください。 完全な実装は、GitHubにあります。

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    assertThat(queue.offer(queue.capacity())).isFalse();
    startConsuming.countDown();
    awakeProducer.await();
    assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. その他のJCToolsデータ構造

JCToolsは、キュー以外のデータ構造もいくつか提供します。

それらのすべてを以下に示します。

  • NonBlockingHashMap –ロックフリーのConcurrentHashMap の代替手段であり、スケーリング特性が優れており、一般的にミューテーションコストが低くなります。  sun.misc.Unsafe を介して実装されているため、HotSpotJava9+またはJRockit環境でこのクラスを使用することはお勧めしません。
  • NonBlockingHashMapLong – NonBlockingHashMap に似ていますが、プリミティブlongキーを使用します
  • NonBlockingHashSet –JDKのjava.util.Collections.newSetFromMap()のような NonBlockingHashMap の単純なラッパー
  • NonBlockingIdentityHashMap – NonBlockingHashMap に似ていますが、IDによってキーを比較します。
  • NonBlockingSetInt –プリミティブlongsの配列として実装されたマルチスレッドビットベクトルセット。 サイレントオートボクシングの場合は効果がありません

6. 性能試験

JMH を使用して、JDKのArrayBlockingQueueとを比較してみましょう。 JCToolsキューのパフォーマンス。 JMHは、Sun / Oracle JVMの専門家によるオープンソースのマイクロベンチマークフレームワークであり、コンパイラ/jvm最適化アルゴリズムの非決定論から私たちを保護します。 詳細については、この記事をご覧ください。

以下のコードスニペットでは、読みやすさを向上させるために、いくつかのステートメントが欠落していることに注意してください。 GitHub:で完全なソースコードを見つけてください

public class MpmcBenchmark {

    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue<Long> queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // intentionally left blank
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && queue.poll() == null) {
            // intentionally left blank
        }
    }
}

結果(95パーセンタイル、操作あたりのナノ秒の抜粋):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

MpmcArrayQueueのパフォーマンスはMpmcAtomicArrayQueueよりもわずかに優れており、ArrayBlockingQueueは2倍遅いことがわかります。

7. JCToolsの使用の欠点

JCToolsの使用には、重要な欠点があります。ライブラリクラスが正しく使用されるように強制することはできません。たとえば、大規模で成熟したプロジェクトで MpscArrayQueue を使用し始めたときの状況を考えてみます(単一のコンシューマーが存在する必要があることに注意してください)。

残念ながら、プロジェクトが大きいため、誰かがプログラミングまたは構成エラーを起こし、キューが複数のスレッドから読み取られる可能性があります。 システムは以前と同じように機能しているように見えますが、現在、消費者がいくつかのメッセージを見逃す可能性があります。 これは実際の問題であり、大きな影響を与える可能性があり、デバッグが非常に困難です。

理想的には、JCToolsにスレッドアクセスポリシーを保証させる特定のシステムプロパティを使用してシステムを実行できる必要があります。 例えば ローカル/テスト/ステージング環境(本番環境ではない)でオンになっている可能性があります。 残念ながら、JCToolsはそのようなプロパティを提供していません。

もう1つの考慮事項は、JCToolsがJDKの対応するものよりも大幅に高速であることを確認したとしても、カスタムキューの実装を使用し始めたときと同じ速度でアプリケーションが得られるわけではないということです。 ほとんどのアプリケーションはスレッド間で多くのオブジェクトを交換せず、ほとんどがI/Oバウンドです。

8. 結論

これで、JCToolsが提供するユーティリティクラスの基本を理解し、負荷の高いJDKの対応するクラスと比較してそれらのパフォーマンスがどれほど優れているかを確認しました。

結論として、スレッド間で多くのオブジェクトを交換する場合にのみライブラリを使用する価値があり、それでもスレッドアクセスポリシーを維持するために非常に注意する必要があります。

いつものように、上記のサンプルの完全なソースコードは、GitHubにあります。