1. 概要

この記事では、競合のないレプリケートされたデータ型(CRDT)と、Javaでそれらを操作する方法について説明します。 この例では、wurmloch-crdtライブラリの実装を使用します。

分散システムにNレプリカノードのクラスターがある場合、ネットワークパーティションが発生する可能性があります—一部のノードは一時的に相互に通信できません。 この状況はスプリットブレインと呼ばれます。

システムにスプリットブレインがある場合、一部の書き込み要求は、同じユーザーであっても、相互に接続されていない異なるレプリカに送信される可能性があります。 このような状況が発生した場合でも、システムは引き続き使用できますが、一貫性がありません

2つの分割されたクラスター間のネットワークが再び機能し始めたときに、一貫性のない書き込みとデータをどう処理するかを決定する必要があります。

2. 競合のない複製されたデータ型をレスキューに

スプリットブレインのために切断された2つのノードABを考えてみましょう。

ユーザーがログインを変更し、リクエストがノードAに送信されたとします。 その後、再度変更することにしましたが、今回はノードBにリクエストが送信されます。

スプリットブレインのため、2つのノードは接続されていません。 ネットワークが再び機能しているときに、このユーザーのログインがどのように表示されるかを決定する必要があります。

いくつかの戦略を利用できます。競合を解決する機会をユーザーに提供するか(Googleドキュメントで行われるように)、分岐したレプリカからのデータをマージするためにCRDTを使用できます[ X205X]私たちのために。

3. Mavenの依存関係

まず、便利なCRDTのセットを提供する依存関係をライブラリに追加しましょう。

<dependency>
    <groupId>com.netopyr.wurmloch</groupId>
    <artifactId>wurmloch-crdt</artifactId>
    <version>0.1.0</version>
</dependency>

最新バージョンはMavenCentralにあります。

4. 成長専用セット

最も基本的なCRDTは、成長専用セットです。 要素はGSetにのみ追加でき、削除することはできません。 GSet が分岐する場合、2つのセットの和集合を計算することで簡単にマージできます。

まず、分散データ構造をシミュレートする2つのレプリカを作成し、 connect()メソッドを使用してこれら2つのレプリカを接続しましょう。

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

クラスタに2つのレプリカを取得したら、最初のレプリカに GSet を作成し、2番目のレプリカでそれを参照できます。

GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();

この時点で、クラスターは期待どおりに機能しており、2つのレプリカ間にアクティブな接続があります。 2つの異なるレプリカからセットに2つの要素を追加し、セットに両方のレプリカに同じ要素が含まれていることを表明できます。

replica1.add("apple");
replica2.add("banana");

assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");

突然ネットワークパーティションができて、最初のレプリカと2番目のレプリカの間に接続がなくなったとしましょう。 disconnect()メソッドを使用してネットワークパーティションをシミュレートできます。

crdtStore1.disconnect(crdtStore2);

次に、両方のレプリカからデータセットに要素を追加すると、それらの間に接続がないため、これらの変更はグローバルに表示されません。

replica1.add("strawberry");
replica2.add("pear");

assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");

両方のクラスターメンバー間の接続が再び確立されると、GSetは両方のセットのユニオンを使用して内部でマージされ、両方のレプリカは再び一貫性があります。

crdtStore1.connect(crdtStore2);

assertThat(replica1)
  .contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
  .contains("apple", "banana", "strawberry", "pear");

5. 増分専用カウンター

増分のみのカウンターは、各ノードですべての増分をローカルに集約するCRDTです。

レプリカが同期するとき、ネットワークパーティションの後、結果の値はすべてのノードのすべての増分を合計することによって計算されます —これはjava.concurrentLongAdderに似ていますがより高い抽象化レベルで。

GCounter を使用して増分専用カウンターを作成し、両方のレプリカから増分してみましょう。 合計が正しく計算されていることがわかります。

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();

replica1.increment();
replica2.increment(2L);

assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);

両方のクラスターメンバーを切断してローカルインクリメント操作を実行すると、値に一貫性がないことがわかります。

crdtStore1.disconnect(crdtStore2);

replica1.increment(3L);
replica2.increment(5L);

assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);

ただし、クラスターが再び正常になると、増分がマージされ、適切な値が生成されます。

crdtStore1.connect(crdtStore2);

assertThat(replica1.get())
  .isEqualTo(11L);
assertThat(replica2.get())
  .isEqualTo(11L);

6. PNカウンター

インクリメントのみのカウンターにも同様のルールを使用して、インクリメントとデクリメントの両方が可能なカウンターを作成できます。 PNCounter は、すべてのインクリメントとデクリメントを別々に格納します。

レプリカが同期すると、結果の値はすべての増分の合計からすべての減分の合計を引いたものに等しくなります。

@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
    LocalCrdtStore crdtStore1 = new LocalCrdtStore();
    LocalCrdtStore crdtStore2 = new LocalCrdtStore();
    crdtStore1.connect(crdtStore2);

    PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
    PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();

    replica1.increment();
    replica2.decrement(2L);

    assertThat(replica1.get()).isEqualTo(-1L);
    assertThat(replica2.get()).isEqualTo(-1L);

    crdtStore1.disconnect(crdtStore2);

    replica1.decrement(3L);
    replica2.increment(5L);

    assertThat(replica1.get()).isEqualTo(-4L);
    assertThat(replica2.get()).isEqualTo(4L);

    crdtStore1.connect(crdtStore2);

    assertThat(replica1.get()).isEqualTo(1L);
    assertThat(replica2.get()).isEqualTo(1L);
}

7. Last-Writer-Wins Register

より複雑なビジネスルールがあり、セットやカウンターでの操作が不十分な場合があります。 Last-Writer-Wins Registerを使用できます。これは、分岐したデータセットをマージするときに最後に更新された値のみを保持します。 Cassandraは、この戦略を使用して競合を解決します。

この戦略を使用する場合は、の間に発生した変更が削除されるため、に非常に注意する必要があります

LWWRegisterクラスの2つのレプリカとインスタンスのクラスターを作成しましょう。

LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);

LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();

replica1.set("apple");
replica2.set("banana");

assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");

最初のレプリカが値をappleに設定し、2番目のレプリカが値を bananaに変更すると、 LWWRegisterは最後の値のみを保持します。

クラスタが切断された場合に何が起こるか見てみましょう。

crdtStore1.disconnect(crdtStore2);

replica1.set("strawberry");
replica2.set("pear");

assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");

各レプリカは、一貫性のないデータのローカルコピーを保持します。 set()メソッドを呼び出すと、 LWWRegister は、VectorClockアルゴリズムを使用してすべてに特定の更新を識別する特別なバージョン値を内部的に割り当てます。

クラスターが同期すると、は最高バージョンの値を取得しおよび以前のすべての更新を破棄します

crdtStore1.connect(crdtStore2);

assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");

8. 結論

この記事では、可用性を維持しながら分散システムの一貫性の問題を示しました。

ネットワークパーティションの場合、クラスターが同期されたときに分岐したデータをマージする必要があります。 CRDTを使用して分岐データのマージを実行する方法を見ました。

これらの例とコードスニペットはすべて、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。