1. 概要

このチュートリアルでは、Javaでリングバッファを実装する方法を学習します。

2. リングバッファ

リングバッファ(または循環バッファ)は、2つ以上のスレッド間でデータをバッファリングするために使用される境界付き循環データ構造です。 リングバッファに書き込みを続けると、最後に到達するとラップアラウンドします。

2.1. 使い方

リングバッファは、境界でラップアラウンドする固定サイズの配列を使用して実装されます

アレイとは別に、次の3つのことを追跡します。

  • 要素を挿入するためにバッファ内で次に使用可能なスロット、
  • バッファ内の次の未読要素、
  • 配列の終わり–バッファが配列の先頭にラップアラウンドするポイント

リングバッファがこれらの要件を処理する方法の仕組みは、実装によって異なります。 たとえば、件名の Wikipedia エントリは、4つのポインタを使用する方法を示しています。

シーケンスを使用したDisruptorのリングバッファの実装からアプローチを借用します。

最初に知っておく必要があるのは、容量、つまりバッファーの固定最大サイズです。 次に、2つの単調に増加するシーケンスを使用します。

  • 書き込みシーケンス:-1から開始し、要素を挿入すると1ずつ増加します
  • 読み取りシーケンス:0から開始し、要素を消費するにつれて1ずつ増加します

mod演算を使用して、シーケンスを配列内のインデックスにマップできます。

arrayIndex = sequence % capacity

mod操作は、シーケンスを境界の周りにラップして、バッファにスロットを導き出します。

要素を挿入する方法を見てみましょう。

buffer[++writeSequence % capacity] = element

要素を挿入する前に、シーケンスを事前にインクリメントしています。

要素を消費するために、ポストインクリメントを行います。

element = buffer[readSequence++ % capacity]

この場合、シーケンスに対してポストインクリメントを実行します。 要素を消費しても、その要素はバッファから削除されません。上書きされるまで、配列内にとどまります

2.2. 空のバッファと満杯のバッファ

配列をラップアラウンドすると、バッファ内のデータの上書きが開始されます。バッファがいっぱいの場合、リーダーがデータを消費したかどうかに関係なく、最も古いデータを上書きするか、保持しているデータの上書きを防ぐかを選択できます。未読

読者が中間値または古い値(たとえば、株価ティッカー)を見逃す余裕がある場合は、データが消費されるのを待たずにデータを上書きできます。 一方、リーダーがすべての値を消費する必要がある場合(eコマーストランザクションの場合など)、バッファーに使用可能なスロットができるまで待機(ブロック/ビジー待機)する必要があります。

バッファのサイズが容量に等しい場合、バッファはいっぱいです。ここで、そのサイズは未読要素の数に等しくなります。

size = (writeSequence - readSequence) + 1
isFull = (size == capacity)

書き込みシーケンスが読み取りシーケンスより遅れている場合、バッファは空です

isEmpty = writeSequence < readSequence

バッファが空の場合、バッファはnull値を返します。

2.2. 長所と短所

リングバッファは効率的なFIFOバッファです。 事前に事前に割り当てることができる固定サイズのアレイを使用し、効率的なメモリアクセスパターンを可能にします。 すべてのバッファ操作は、要素のシフトを必要としないため、要素の消費を含め、一定時間 O(1)です。

反対に、リングバッファの正しいサイズを決定することは重要です。 たとえば、バッファのサイズが小さく、読み取りが遅い場合、書き込み操作が長時間ブロックされる可能性があります。 動的なサイズ設定を使用できますが、データを移動する必要があり、上記の利点のほとんどを見逃してしまいます。

3. Javaでの実装

リングバッファがどのように機能するかを理解したので、Javaでの実装に進みましょう。

3.1. 初期化

まず、事前定義された容量でバッファーを初期化するコンストラクターを定義しましょう。

public CircularBuffer(int capacity) {
    this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
    this.data = (E[]) new Object[this.capacity];
    this.readSequence = 0;
    this.writeSequence = -1;
}

これにより、前のセクションで説明したように、空のバッファが作成され、シーケンスフィールドが初期化されます。

3.2. オファー

次に、次の使用可能なスロットのバッファに要素を挿入し、成功するとtrueを返すoffer操作を実装します。 バッファが空のスロットを見つけることができない場合、つまり未読の値を上書きできない場合falseを返します。

Javaにofferメソッドを実装しましょう。

public boolean offer(E element) {
    boolean isFull = (writeSequence - readSequence) + 1 == capacity;
    if (!isFull) {
        int nextWriteSeq = writeSequence + 1;
        data[nextWriteSeq % capacity] = element;
        writeSequence++;
        return true;
    }
    return false;
}

そのため、書き込みシーケンスをインクリメントし、次に使用可能なスロットの配列内のインデックスを計算しています。 次に、データをバッファに書き込み、更新された書き込みシーケンスを保存します。

試してみましょう:

@Test
public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);

    assertTrue(buffer.offer("Square"));
    assertEquals(1, buffer.size());
}

3.3. 投票

最後に、次の未読要素を取得して削除するpoll操作を実装します。 ポーリング操作は要素を削除しませんが、読み取りシーケンスをインクリメントします

それを実装しましょう:

public E poll() {
    boolean isEmpty = writeSequence < readSequence;
    if (!isEmpty) {
        E nextValue = data[readSequence % capacity];
        readSequence++;
        return nextValue;
    }
    return null;
}

ここでは、配列内のインデックスを計算することにより、現在の読み取りシーケンスでデータを読み取っています。 次に、バッファが空でない場合は、シーケンスをインクリメントして値を返します。

それをテストしてみましょう:

@Test
public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);
    buffer.offer("Triangle");
    String shape = buffer.poll();

    assertEquals("Triangle", shape);
}

4. 生産者/消費者問題

2つ以上のスレッド間でデータを交換するためのリングバッファの使用について説明しました。これは、Producer-Consumer問題と呼ばれる同期問題の例です。 Javaでは、セマフォバウンドキュー、リングバッファなどを使用して、さまざまな方法で生産者/消費者問題を解決できます。

リングバッファに基づくソリューションを実装しましょう。

4.1. volatileシーケンスフィールド

リングバッファの実装はスレッドセーフではありません。 単純な単一のプロデューサーと単一のコンシューマーの場合にスレッドセーフにしましょう。

プロデューサーはデータをバッファーに書き込み、 writeSequence をインクリメントしますが、コンシューマーはバッファーから読み取り、readSequenceをインクリメントします。 したがって、バッキングアレイは競合がなく、同期なしで回避できます。

ただし、コンシューマーが writeSequence フィールドの最新値( visibility )を確認できること、およびデータが更新される前にwriteSequenceが更新されないことを確認する必要があります。バッファで実際に利用可能(注文)。

この場合、シーケンスフィールドを揮発性にすることでリングバッファを同時かつロックフリーにすることができます

private volatile int writeSequence = -1, readSequence = 0;

offer メソッドでは、volatileフィールドへの書き込みwriteSequenceは、シーケンスを更新する前にバッファーへの書き込みが行われることを保証します。 同時に、 volatile の可視性保証により、コンシューマーは常にwriteSequenceの最新値を確認できます。

4.2. プロデューサー

リングバッファに書き込む単純なプロデューサRunnableを実装しましょう。

public void run() {
    for (int i = 0; i < items.length;) {
        if (buffer.offer(items[i])) {
           System.out.println("Produced: " + items[i]);
            i++;
        }
    }
}

プロデューサースレッドは、ループ内の空のスロットを待機します(ビジー待機)。

4.3. 消費者

バッファーから読み取るコンシューマーCallableを実装します。

public T[] call() {
    T[] items = (T[]) new Object[expectedCount];
    for (int i = 0; i < items.length;) {
        T item = buffer.poll();
        if (item != null) {
            items[i++] = item;
            System.out.println("Consumed: " + item);
        }
    }
    return items;
}

コンシューマスレッドは、バッファから null 値を受け取った場合、印刷せずに続行します。

ドライバーコードを書いてみましょう。

executorService.submit(new Thread(new Producer<String>(buffer)));
executorService.submit(new Thread(new Consumer<String>(buffer)));

生産者/消費者プログラムを実行すると、次のような出力が生成されます。

Produced: Circle
Produced: Triangle
  Consumed: Circle
Produced: Rectangle
  Consumed: Triangle
  Consumed: Rectangle
Produced: Square
Produced: Rhombus
  Consumed: Square
Produced: Trapezoid
  Consumed: Rhombus
  Consumed: Trapezoid
Produced: Pentagon
Produced: Pentagram
Produced: Hexagon
  Consumed: Pentagon
  Consumed: Pentagram
Produced: Hexagram
  Consumed: Hexagon
  Consumed: Hexagram

5. 結論

このチュートリアルでは、リングバッファーを実装する方法を学び、それを使用して生産者/消費者問題を解決する方法を探りました。

いつものように、すべての例のソースコードは、GitHubから入手できます。