1概要

この記事ではhttps://lmax-exchange.github.io/disruptor/[LMAX Disruptor]を紹介し、低遅延でソフトウェアの並行性を実現する方法について説明します。また、Disruptorライブラリの基本的な使い方も見ていきます。


2ディスラプターとは何ですか?

ディスラプターはLMAXによって書かれたオープンソースのJavaライブラリです。これは、待ち時間が少ない(そして、並行コードの複雑さを伴うことなく)多数のトランザクションを処理するための並行プログラミングフレームワークです。パフォーマンスの最適化は、基盤となるハードウェアの効率を利用するソフトウェア設計によって実現されています。


2.1. 機械的な同情


mechanical sympathy

の中核となる概念から始めましょう。それは、基礎となるハードウェアがどのように動作するかを理解し、そのハードウェアで最適に機能するようにプログラミングすることです。

たとえば、CPUとメモリの構成がソフトウェアのパフォーマンスにどのように影響するかを見てみましょう。 CPUはメインメモリとの間にいくつかのキャッシュレイヤを持っています。 CPUが操作を実行しているとき、まずデータをL1で調べ、次にL2、次にL3、そして最後にメインメモリを調べます。さらに進む必要があるほど、操作に時間がかかります。

1つのデータに対して同じ操作(ループカウンタなど)を複数回実行する場合は、そのデータをCPUに非常に近い場所にロードするのが妥当です。

キャッシュミスのコストを示すいくつかの指標的な数値:

| =================================== | CPUからCPUサイクルまでの待ち時間| CPUサイクル|時間|メインメモリ|複数|〜60-80 ns | L3キャッシュ|〜40-45サイクル|〜15 ns | L2キャッシュ|〜10サイクル|〜3 ns | L1キャッシュ|〜3-4サイクル|〜1 ns |レジスタ| 1 cycle |とても速い| =======================================


2.2. キューに入らないのはなぜ

キューの実装では、先頭、末尾、およびサイズの変数で書き込み競合が発生する傾向があります。キューは通常、消費者とプロデューサの間のペースの違いにより、常にフルに近い、または空に近いです。

生産と消費の割合が均等に釣り合った均衡の取れた中間地でそれらが操業することはめったにない。

書き込みの競合に対処するために、キューはしばしばロックを使用します。これはカーネルへのコンテキスト切り替えを引き起こす可能性があります。これが発生すると、関係するプロセッサはキャッシュ内のデータを失う可能性があります。

最良のキャッシュ動作を実現するには、設計は任意のメモリ位置にコアを1つだけ書き込む必要があります(プロセッサはキャッシュ間で特別な高速リンクを使用することが多いため、複数のリーダーで結構です)。キューは1人の作家の原則に失敗します。

2つの別々のスレッドが2つの異なる値を書き込んでいる場合、各コアは他方のキャッシュラインを無効にします(データはキャッシュラインと呼ばれる固定サイズのブロック単位でメインメモリとキャッシュ間で転送されます)。 2つのスレッドが2つの異なる変数に書き込んでいても、それは2つのスレッド間の書き込み競合です。これは偽の共有と呼ばれます。なぜなら、ヘッドがアクセスされるたびにテールもアクセスされ、またその逆も同様です。


2.3. ディスラプターの仕組み

ディスラプターは配列ベースの循環データ構造(リングバッファ)を持っています。これは、次に利用可能なスロットへのポインタを持つ配列です。事前に割り当てられた転送オブジェクトがいっぱいです。プロデューサとコンシューマは、ロックや競合なしにリングへのデータの書き込みと読み取りを実行します。

ディスラプターでは、すべてのイベントがすべてのコンシューマーに発行され(マルチキャスト)、別々のダウンストリームキューを介して並行して消費されます。消費者による並列処理のために、消費者間の依存関係を調整することが必要である(依存グラフ)。

プロデューサとコンシューマには、バッファ内の現在作業中のスロットを示すシーケンスカウンタがあります。各プロデューサ/コンシューマは独自のシーケンスカウンタを書き込むことができますが、他のシーケンスカウンタを読み取ることはできます。

プロデューサとコンシューマはカウンタを読み取り、書き込みたいスロットがロックなしで使用可能であることを確認します。


3ディスラプターライブラリーの使用


3.1. Mavenの依存関係


pom.xml

にDisruptorライブラリの依存関係を追加することから始めましょう。

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

依存関係の最新版はhttps://search.maven.org/classic/#search%7C1%7Cg%3A%22com.lmax%22%20AND%20a%3A%22disruptor%22[here]で確認できます。


3.2. イベントの定義

データを運ぶイベントを定義しましょう。

public static class ValueEvent {
    private int value;
    public final static EventFactory EVENT__FACTORY
      = () -> new ValueEvent();

   //standard getters and setters
}


EventFactory

はディスラプターにイベントの事前割り当てをさせます。


3.3. 消費者

消費者はリングバッファからデータを読みます。イベントを処理するコンシューマを定義しましょう。

public class SingleEventPrintConsumer {
    ...

    public EventHandler<ValueEvent>[]getEventHandler() {
        EventHandler<ValueEvent> eventHandler
          = (event, sequence, endOfBatch)
            -> print(event.getValue(), sequence);
        return new EventHandler[]{ eventHandler };
    }

    private void print(int id, long sequenceId) {
        logger.info("Id is " + id
          + " sequence id that was used is " + sequenceId);
    }
}

この例では、消費者は単にログに印刷しています。


3.4. ディスラプターの構築

ディスラプターを構築します。

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;

WaitStrategy waitStrategy = new BusySpinWaitStrategy();
Disruptor<ValueEvent> disruptor
  = new Disruptor<>(
    ValueEvent.EVENT__FACTORY,
    16,
    threadFactory,
    ProducerType.SINGLE,
    waitStrategy);

Disruptorのコンストラクタでは、以下が定義されています。

  • Event Factory – 生成されるオブジェクトを生成します

初期化中にリングバッファに格納される
リングバッファのサイズ – リングのサイズとして16を定義しました

バッファ。それは2の累乗でなければなりませんそれ以外の場合は例外をスローします
初期化それはのほとんどを実行するのは簡単ですのでこれは重要です
論理二項演算子を使用した演算モッドオペレーション
** Thread Factory – イベントプロセッサ用のスレッドを作成するための工場

  • Producer Type – シングルかマルチかを指定します

生産者
** 待機戦略 – 低速の加入者をどのように処理したいかを定義します。

生産者のペースについていけない人

コンシューマハンドラを接続します。

disruptor.handleEventsWith(getEventHandler());

プロデューサーによって作成されたデータを処理するために複数のコンシューマーにDisruptorを提供することが可能です。上記の例では、コンシューマa.k.a.イベントハンドラは1つだけです。


3.5. ディスラプターの起動

ディスラプターを起動するには:

RingBuffer<ValueEvent> ringBuffer = disruptor.start();


3.6. イベントの制作と公開

プロデューサは、データを順番にリングバッファに入れます。プロデューサは、まだ消費されていないデータを上書きしないように、次に利用可能なスロットを認識する必要があります。

発行にはDisruporの

RingBuffer

を使用します。

for (int eventCount = 0; eventCount < 32; eventCount++) {
    long sequenceId = ringBuffer.next();
    ValueEvent valueEvent = ringBuffer.get(sequenceId);
    valueEvent.setValue(eventCount);
    ringBuffer.publish(sequenceId);
}

ここでは、プロデューサーが順番にアイテムを作成して公開しています。 Disruptorは2フェーズコミットプロトコルと同様に機能することに注意してください。それは新しい

sequenceId

を読み、公開します。次回は

sequenceId

+ 1を次の__sequenceIdとして取得する必要があります。


4結論

このチュートリアルでは、ディスラプターとは何か、そして低レイテンシーで並行処理を実現する方法を説明しました。我々は、機械的な共感の概念と、それが低レイテンシを達成するためにどのように利用されるのかを見てきた。それから、Disruptorライブラリを使った例を見ました。

サンプルコードはhttps://github.com/eugenp/tutorials/tree/master/disruptor[GitHubプロジェクト]にあります。これはMavenベースのプロジェクトなので、そのままインポートして実行するのが簡単です。