1. 概要

この記事では、分散型のフォールトトレラントレコードストレージシステムを実装するサービスであるBookKeeperを紹介します。

2. BookKeeper とは何ですか?

BookKeeperは、もともとYahooによって ZooKeeper サブプロジェクトとして開発され、2015年にトップレベルのプロジェクトになりました。 BookKeeperは、その中核として、ログエントリ(別名レコード)のシーケンスをLedgersと呼ばれるデータ構造に格納する信頼性の高い高性能システムを目指しています。

元帳の重要な機能は、追加のみで不変であるという事実です。 これにより、BookKeeperは、分散ログシステム、Pub-Subメッセージングアプリケーション、リアルタイムストリーム処理などの特定のアプリケーションに適しています。

3. BookKeeperの概念

3.1. ログエントリ

ログエントリには、クライアントアプリケーションがBookKeeperに保存したり、BookKeeperから読み取ったりする分割できないデータの単位が含まれます。元帳に保存される場合、各エントリには、提供されたデータといくつかのメタデータフィールドが含まれます。

これらのメタデータフィールドには、 entryId、が含まれます。これらは、特定の元帳内で一意である必要があります。 BookKeeperがエントリが破損している、または改ざんされていることを検出するために使用する認証コードもあります。

BookKeeperはそれ自体ではシリアル化機能を提供しないため、クライアントは独自の方法を考案して、高レベルの構成をbyte配列との間で変換する必要があります。

3.2. 元帳

元帳は、BookKeeperによって管理される基本的なストレージユニットであり、ログエントリの順序付けられたシーケンスを格納します。 前述のように、元帳には追加専用のセマンティクスがあります。つまり、元帳に追加されたレコードは変更できません。

また、クライアントが元帳への書き込みを停止して閉じると、BookKeeperが元帳を封印し、後でデータを追加できなくなります。 これは、BookKeeperを中心にアプリケーションを設計するときに覚えておくべき重要なポイントです。 元帳は、キューなどの高レベルの構成を直接実装するのに適した候補ではありません。 代わりに、これらの高レベルの概念をサポートするより基本的なデータ構造を作成するために元帳がより頻繁に使用されることがわかります。

たとえば、ApacheのDistributedLog プロジェクトは、ログセグメントとして元帳を使用します。 これらのセグメントは分散ログに集約されますが、基礎となる元帳は通常のユーザーに対して透過的です。

BookKeeperは、複数のサーバーインスタンス間でログエントリを複製することにより、元帳の復元力を実現します。 3つのパラメーターは、保持されるサーバーとコピーの数を制御します。

  • アンサンブルサイズ:元帳データの書き込みに使用されるサーバーの数
  • 書き込みクォーラムサイズ:特定のログエントリを複製するために使用されるサーバーの数
  • 確認応答サイズ:特定のログエントリの書き込み操作を確認する必要があるサーバーの数

これらのパラメーターを調整することにより、特定の元帳のパフォーマンスと復元力の特性を調整できます。 元帳に書き込む場合、BookKeeperは、クラスターメンバーの最小クォーラムがそれを確認した場合にのみ操作が成功したと見なします。

BookKeeperは、内部メタデータに加えて、元帳へのカスタムメタデータの追加もサポートしています。 これらは、クライアントが作成時に渡すキーと値のペアのマップであり、BookKeeperはZooKeeperに独自のストアと一緒に格納されます。

3.3. ブックメーカー

ブックメーカーは、1つまたはモードの元帳を保持するサーバーです。 BookKeeperクラスターは、特定の環境で実行される多数のブックメーカーで構成され、プレーンなTCPまたはTLS接続を介してクライアントにサービスを提供します。

Bookiesは、ZooKeeperが提供するクラスターサービスを使用してアクションを調整します。 これは、完全にフォールトトレラントなシステムを実現する場合、少なくとも3インスタンスのZooKeeperと3インスタンスのBookKeeperのセットアップが必要であることを意味します。 このようなセットアップは、少なくともデフォルトの元帳セットアップ(3ノードアンサンブルサイズ、2ノード書き込みクォーラム、および2ノードackクォーラム)では、単一のインスタンスに障害が発生しても正常に動作できる場合の損失を許容できます。

4. ローカルセットアップ

BookKeeperをローカルで実行するための基本的な要件は非常に控えめです。 まず、BookKeeperの元帳メタデータストレージを提供するZooKeeperインスタンスを稼働させる必要があります。 次に、実際のサービスをクライアントに提供するブックメーカーを展開します。

これらの手順を手動で実行することは確かに可能ですが、ここでは、公式のApacheイメージを使用してこのタスクを簡略化するdocker-composeファイルを使用します。

$ cd <path to docker-compose.yml>
$ docker-compose up

このdocker-composeは、3つのブックメーカーと1つのZooKeeperインスタンスを作成します。 すべてのブックメーカーは同じマシンで実行されるため、テスト目的でのみ役立ちます。 公式ドキュメントには、完全にフォールトトレラントなクラスターを構成するために必要な手順が含まれています。

ブックキーパーのシェルコマンドlistbookiesを使用して、基本的なテストを実行して、期待どおりに機能していることを確認しましょう。

$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
  shell listbookies -readwrite
ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181

出力には、3つのブックで構成される利用可能なブックのリストが表示されます。 表示されるIPアドレスは、ローカルのDockerインストールの詳細に応じて変わることに注意してください。

5. LedgerAPIの使用

Ledger APIは、BookKeeperとインターフェイスするための最も基本的な方法です。 Ledger オブジェクトと直接対話できますが、一方で、ストリームなどの高レベルの抽象化を直接サポートしていません。 これらのユースケースのために、BookKeeperプロジェクトは、これらの機能をサポートする別のライブラリ、DistributedLogを提供します。

Ledger APIを使用するには、bookkeeper-server依存関係をプロジェクトに追加する必要があります。

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server</artifactId>
    <version>4.10.0</version>
</dependency>

注:ドキュメントに記載されているように、この依存関係を使用すると、protobufおよびguavaライブラリの依存関係も含まれます。 プロジェクトでこれらのライブラリも必要であるが、BookKeeperで使用されているものとは異なるバージョンの場合は、これらのライブラリをシェーディングする代替依存関係を使用できます。

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server-shaded</artifactId>
    <version>4.10.0</version>
</dependency>

5.1. Bookiesへの接続

BookKeeperクラスはLedgerAPIのメインエントリポイントであり、BookKeeperサービスに接続するためのいくつかのメソッドを提供します。 最も単純な形式では、このクラスの新しいインスタンスを作成し、BookKeeperが使用するZooKeeperサーバーの1つのアドレスを渡すだけです。

BookKeeper client = new BookKeeper("zookeeper-host:2131");

ここで、 zookeeper-host は、BookKeeperのクラスター構成を保持するZooKeeperサーバーのIPアドレスまたはホスト名に設定する必要があります。 この場合、これは通常「localhost」またはDOCKER_HOST環境変数が指すホストです。

クライアントを微調整するために使用できるいくつかのパラメーターをさらに制御する必要がある場合は、 ClientConfiguration インスタンスを使用して、それを使用してクライアントを作成できます。

ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");

// ... set other properties
 
BookKeeper.forConfig(cfg).build();

5.2. 元帳の作成

BookKeeper インスタンスができたら、新しい元帳を作成するのは簡単です。

LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC,"password".getBytes());

ここでは、このメソッドの最も単純な変形を使用しました。エントリの整合性を確保するために、MACダイジェストタイプを使用して、デフォルト設定で新しい元帳を作成します。

元帳にカスタムメタデータを追加する場合は、すべてのパラメーターを受け取るバリアントを使用する必要があります。

LedgerHandle lh = bk.createLedger(
  3,
  2,
  2,
  DigestType.MAC,
  "password".getBytes(),
  Collections.singletonMap("name", "my-ledger".getBytes()));

今回は、 createLedger()メソッドのフルバージョンを使用しました。 最初の3つの引数は、それぞれアンサンブルサイズ、書き込みクォーラム、およびackクォーラム値です。 次に、以前と同じダイジェストパラメータがあります。 最後に、カスタムメタデータを使用してMapを渡します。

上記のどちらの場合も、createLedgerは同期操作です。 BookKeeperは、コールバックを使用した非同期元帳の作成も提供します。

bk.asyncCreateLedger(
  3,
  2,
  2,
  BookKeeper.DigestType.MAC, "passwd".getBytes(),
  (rc, lh, ctx) -> {
      // ... use lh to access ledger operations
  },
  null,
  Collections.emptyMap());

BookKeeperの新しいバージョン(> = 4.6)も、同じ目標を達成するために流暢なスタイルのAPIとCompleteableFutureをサポートしています。

CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
  .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
  .withPassword("password".getBytes())
  .execute();

この場合、LedgerHandleの代わりにWriteHandleを取得することに注意してください。 後で説明するように、 LedgerHandle implements WriteHandle。として、これらのいずれかを使用して元帳にアクセスできます。

5.3. データの書き込み

LedgerHandleまたはWriteHandleを取得したら、 append()メソッドバリアントの1つを使用して、関連する元帳にデータを書き込みます。 同期バリアントから始めましょう:

for(int i = 0; i < MAX_MESSAGES; i++) {
    byte[] data = new String("message-" + i).getBytes();
    lh.append(data);
}

ここでは、byte配列を使用するバリアントを使用しています。 APIは、NettyのByteBufおよびJavaNIOのByteBufferもサポートします。これにより、タイムクリティカルなシナリオでのメモリ管理が向上します。

非同期操作の場合、取得した特定のハンドルタイプに応じてAPIが少し異なります。 WriteHandleCompletableFutureを使用しますが、LedgerHandleもコールバックベースのメソッドをサポートします。

// Available in WriteHandle and LedgerHandle
CompletableFuture<Long> f = lh.appendAsync(data);

// Available only in LedgerHandle
lh.asyncAddEntry(
  data,
  (rc,ledgerHandle,entryId,ctx) -> {
      // ... callback logic omitted
  },
  null);

どちらを選択するかは主に個人的な選択ですが、一般的に、CompleteableFutureベースのAPIを使用すると読みやすくなる傾向があります。 また、 Mono を直接構築できるという副次的な利点があり、BookKeeperをリアクティブアプリケーションに簡単に統合できます。

5.4. データの読み取り

BookKeeper元帳からのデータの読み取りは、書き込みと同じように機能します。 まず、 BookKeeper インスタンスを使用して、 LedgerHandle を作成します。

LedgerHandle lh = bk.openLedger(
  ledgerId, 
  BookKeeper.DigestType.MAC,
  ledgerPassword);

後で説明するledgerIdパラメーターを除いて、このコードは前に見た createLedger()メソッドによく似ています。 ただし、重要な違いがあります。 このメソッドは、読み取り専用のLedgerHandleインスタンスを返します。 使用可能なappend()メソッドのいずれかを使用しようとすると、例外が発生します。

または、より安全な方法は、流暢なスタイルのAPIを使用することです。

ReadHandle rh = bk.newOpenLedgerOp()
  .withLedgerId(ledgerId)
  .withDigestType(DigestType.MAC)
  .withPassword("password".getBytes())
  .execute()
  .get();

ReadHandle には、元帳からデータを読み取るために必要なメソッドがあります。

long lastId = lh.readLastConfirmed();
rh.read(0, lastId).forEach((entry) -> {
    // ... do something 
});

ここでは、同期 read バリアントを使用して、この元帳で使用可能なすべてのデータを要求しただけです。 予想通り、非同期バリアントもあります。

rh.readAsync(0, lastId).thenAccept((entries) -> {
    entries.forEach((entry) -> {
        // ... process entry
    });
});

古いopenLedger()メソッドを使用することを選択した場合、非同期メソッドのコールバックスタイルをサポートする追加のメソッドが見つかります。

lh.asyncReadEntries(
  0,
  lastId,
  (rc,lh,entries,ctx) -> {
      while(entries.hasMoreElements()) {
          LedgerEntry e = ee.nextElement();
      }
  },
  null);

5.5. 元帳の一覧表示

データを開いて読み取るには、元帳のidが必要であることを以前に確認しました。 では、どうすれば入手できますか? 1つの方法は、BookKeeperインスタンスからアクセスできるLedgerManagerインターフェイスを使用することです。 このインターフェイスは基本的に元帳メタデータを処理しますが、 asyncProcessLedgers()メソッドもあります。 この方法(およびいくつかのヘルプは同時プリミティブを形成する)を使用して、使用可能なすべての元帳を列挙できます。

public List listAllLedgers(BookKeeper bk) {
    List ledgers = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch processDone = new CountDownLatch(1);

    bk.getLedgerManager()
      .asyncProcessLedgers(
        (ledgerId, cb) -> {
            ledgers.add(ledgerId);
            cb.processResult(BKException.Code.OK, null, null);
        }, 
        (rc, s, obj) -> {
            processDone.countDown();
        },
        null,
        BKException.Code.OK,
        BKException.Code.ReadException);
 
    try {
        processDone.await(1, TimeUnit.MINUTES);
        return ledgers;
    } catch (InterruptedException ie) {
        throw new RuntimeException(ie);
    }
}

このコードを要約してみましょう。これは、一見些細なタスクで予想されるよりも少し長くなります。 asyncProcessLedgers()メソッドには2つのコールバックが必要です

最初のものは、リスト内のすべての元帳IDを収集します。 このコールバックは複数のスレッドから呼び出すことができるため、ここでは同期リストを使用しています。 元帳IDに加えて、このコールバックはコールバックパラメータも受け取ります。 processResult()メソッドを呼び出して、データを処理したことを確認し、さらにデータを取得する準備ができたことを通知する必要があります。

2番目のコールバックは、すべての元帳がプロセッサコールバックに送信されたとき、または障害が発生したときに呼び出されます。 この例では、エラー処理を省略しています。 代わりに、 CountDownLatch をデクリメントしているだけです。これにより、 await 操作が終了し、メソッドが使用可能なすべての元帳のリストを返すことができます。

6. 結論

この記事では、Apache BookKeeperプロジェクトについて説明し、そのコアコンセプトを確認し、低レベルAPIを使用してLedgersにアクセスし、読み取り/書き込み操作を実行します。

いつものように、すべてのコードはGitHubを介して利用できます。