1. 概要

この記事では、 LMAX Disruptor を紹介し、低レイテンシでソフトウェアの同時実行性を実現するのにどのように役立つかについて説明します。 また、Disruptorライブラリの基本的な使用法についても説明します。

2. ディスラプターとは何ですか?

Disruptorは、LMAXによって作成されたオープンソースのJavaライブラリです。 これは、多数のトランザクションを処理するための並行プログラミングフレームワークであり、レイテンシーは低く(並行コードの複雑さはありません)。 パフォーマンスの最適化は、基盤となるハードウェアの効率を活用するソフトウェア設計によって実現されます。

2.1. 機械的な共感

機械的共感のコアコンセプトから始めましょう。これは、基盤となるハードウェアがどのように動作するかを理解し、そのハードウェアで最適に動作するようにプログラミングすることです。

たとえば、CPUとメモリの編成がソフトウェアのパフォーマンスにどのように影響するかを見てみましょう。 CPUには、CPUとメインメモリの間にいくつかのキャッシュ層があります。 CPUが操作を実行しているとき、CPUは最初にL1でデータを検索し、次にL2、次にL3、最後にメインメモリを検索します。 遠くに行かなければならないほど、操作に時間がかかります。

同じ操作がデータに対して複数回実行される場合(たとえば、ループカウンター)、そのデータをCPUに非常に近い場所にロードすることは理にかなっています。

キャッシュミスのコストのいくつかの指標値:

CPUから CPUサイクル 時間
メインメモリ 多数 〜60-80 ns
L3キャッシュ 〜40-45サイクル 〜15 ns
L2キャッシュ 〜10サイクル 〜3 ns
L1キャッシュ 〜3-4サイクル 〜1 ns
登録 1サイクル 非常に速い

2.2. なぜキューしないのか

キューの実装では、ヘッド変数、テール変数、およびサイズ変数で書き込みの競合が発生する傾向があります。 キューは通常、コンシューマーとプロデューサーのペースの違いにより、常に満杯に近いか、空に近い状態になっています。 それらは、生産と消費の速度が均等に一致するバランスの取れた中間地で動作することはめったにありません。

書き込みの競合に対処するために、キューは多くの場合ロックを使用します。これにより、カーネルへのコンテキストスイッチが発生する可能性があります。 これが発生すると、関係するプロセッサがキャッシュ内のデータを失う可能性があります。

最高のキャッシュ動作を得るには、任意のメモリ位置に書き込むコアを1つだけにする必要があります(プロセッサはキャッシュ間で特別な高速リンクを使用することが多いため、複数のリーダーで問題ありません)。 キューは、ワンライターの原則に失敗します。

2つの別々のスレッドが2つの異なる値に書き込んでいる場合、各コアは他方のキャッシュラインを無効にします(データはメインメモリとキャッシュの間で、キャッシュラインと呼ばれる固定サイズのブロックで転送されます)。 これは、2つの異なる変数に書き込んでいる場合でも、2つのスレッド間の書き込み競合です。 これは偽共有と呼ばれます。これは、ヘッドにアクセスするたびにテールにもアクセスするためであり、その逆も同様です。

2.3. ディスラプターのしくみ

Disruptorには、配列ベースの循環データ構造(リングバッファー)があります。 これは、次に使用可能なスロットへのポインタを持つ配列です。 事前に割り当てられた転送オブジェクトで埋められます。 プロデューサーとコンシューマーは、ロックや競合なしにリングへのデータの書き込みと読み取りを実行します。

ディスラプターでは、すべてのイベントがすべてのコンシューマーに公開され(マルチキャスト)、個別のダウンストリームキューを介して並行して消費されます。 コンシューマーによる並列処理のため、コンシューマー間の依存関係を調整する必要があります(依存関係グラフ)。

プロデューサーとコンシューマーには、現在作業しているバッファー内のスロットを示すシーケンスカウンターがあります。 各プロデューサー/コンシューマーは独自のシーケンスカウンターを書き込むことができますが、他のシーケンスカウンターを読み取ることはできます。 プロデューサーとコンシューマーはカウンターを読み取り、書き込みたいスロットがロックなしで使用可能であることを確認します。

3. ディスラプターライブラリの使用

3.1. Mavenの依存関係

pom.xmlにDisruptorライブラリの依存関係を追加することから始めましょう。

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

依存関係の最新バージョンはここで確認できます。

3.2. イベントの定義

データを運ぶイベントを定義しましょう:

public static class ValueEvent {
    private int value;
    public final static EventFactory EVENT_FACTORY 
      = () -> new ValueEvent();

    // standard getters and setters
}

EventFactory を使用すると、ディスラプターはイベントを事前に割り当てることができます。

3.3. 消費者

消費者はリングバッファからデータを読み取ります。 イベントを処理するコンシューマーを定義しましょう。

public class SingleEventPrintConsumer {
    ...

    public EventHandler<ValueEvent>[] getEventHandler() {
        EventHandler<ValueEvent> eventHandler 
          = (event, sequence, endOfBatch) 
            -> print(event.getValue(), sequence);
        return new EventHandler[] { eventHandler };
    }
 
    private void print(int id, long sequenceId) {
        logger.info("Id is " + id 
          + " sequence id that was used is " + sequenceId);
    }
}

この例では、コンシューマーはログに印刷しているだけです。

3.4. ディスラプターの構築

ディスラプターを構築します。

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;

WaitStrategy waitStrategy = new BusySpinWaitStrategy();
Disruptor<ValueEvent> disruptor 
  = new Disruptor<>(
    ValueEvent.EVENT_FACTORY, 
    16, 
    threadFactory, 
    ProducerType.SINGLE, 
    waitStrategy);

Disruptorのコンストラクターでは、以下が定義されています。

  • イベントファクトリ–初期化中にリングバッファに格納されるオブジェクトの生成を担当します
  • リングバッファのサイズ–リングバッファのサイズとして16を定義しました。 2の累乗である必要があります。そうでない場合、初期化中に例外がスローされます。 論理二項演算子を使用してほとんどの操作を簡単に実行できるため、これは重要です。 mod操作
  • スレッドファクトリ–イベントプロセッサ用のスレッドを作成するファクトリ
  • プロデューサータイプ–単一または複数のプロデューサーを使用するかどうかを指定します
  • 待機戦略–プロデューサーのペースに追いついていない遅いサブスクライバーをどのように処理するかを定義します

コンシューマーハンドラーを接続します。

disruptor.handleEventsWith(getEventHandler());

複数のコンシューマーにDisruptorを提供して、プロデューサーによって生成されたデータを処理することができます。 上記の例では、消費者は1人だけです。 イベントハンドラ。

3.5. ディスラプターの起動

ディスラプタを起動するには:

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

3.6. イベントの作成と公開

プロデューサーは、データを順番にリングバッファーに配置します。 プロデューサーは、まだ消費されていないデータを上書きしないように、次に使用可能なスロットを認識している必要があります。

DisruptorのRingBufferを使用して公開します。

for (int eventCount = 0; eventCount < 32; eventCount++) {
    long sequenceId = ringBuffer.next();
    ValueEvent valueEvent = ringBuffer.get(sequenceId);
    valueEvent.setValue(eventCount);
    ringBuffer.publish(sequenceId);
}

ここでは、プロデューサーがアイテムを順番に作成および公開しています。 ここで重要なのは、Disruptorが2フェーズコミットプロトコルと同様に機能することです。 新しいsequenceIdを読み取り、公開します。 次回は、次のsequenceIdとしてsequenceId +1を取得する必要があります。

4. 結論

このチュートリアルでは、ディスラプターとは何か、およびディスラプターが低レイテンシーで並行性を実現する方法について説明しました。 機械的な共感の概念と、それを利用して低遅延を実現する方法を見てきました。 次に、Disruptorライブラリを使用した例を見てきました。

サンプルコードはGitHubプロジェクトにあります。これはMavenベースのプロジェクトであるため、そのままインポートして実行するのは簡単です。