1. 序章

Apache Curator は、分散アプリケーションで人気のある調整サービスである ApacheZookeeperのJavaクライアントです。

このチュートリアルでは、キュレーターが提供する最も関連性の高い機能のいくつかを紹介します。

  • 接続管理–接続の管理とポリシーの再試行
  • 非同期–非同期機能とJava8ラムダの使用を追加することで既存のクライアントを強化します
  • 構成管理–システムを一元的に構成する
  • 強い型のモデル–型付きモデルの操作
  • レシピ–リーダー選出、分散ロックまたはカウンターの実装

2. 前提条件

まず、 ApacheZookeeperとその機能を簡単に確認することをお勧めします。

このチュートリアルでは、 127.0.0.1:2181で実行されているスタンドアロンのZookeeperインスタンスがすでに存在することを前提としています。 始めたばかりの場合は、ここにインストールして実行する方法の説明があります。

まず、curator-x-async依存関係をpom.xmlに追加する必要があります。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-async</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

最新バージョンのApacheCurator4.XXは、現在まだベータ版であるZookeeper3.5.Xとの強い依存関係があります。

そのため、この記事では、代わりに現在最新の安定版 Zookeeper3.4.11を使用します。

したがって、Zookeeperの依存関係を除外し、Zookeeperバージョンの依存関係をpom.xmlに追加する必要があります。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

互換性の詳細については、このリンクを参照してください。

3. 接続管理

Apache Curatorの基本的なユースケースは、実行中のApacheZookeeperインスタンスに接続することです。

このツールは、再試行ポリシーを使用してZookeeperへの接続を構築するためのファクトリを提供します。

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
  maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);
client.start();
 
assertThat(client.checkExists().forPath("/")).isNotNull();

この簡単な例では、3回再試行し、接続に問題が発生した場合に再試行の間に100ミリ秒待機します。

CuratorFramework クライアントを使用してZookeeperに接続すると、パスを参照し、データを取得/設定し、基本的にサーバーと対話できるようになります。

4. 非同期

Curator Asyncモジュールは、上記のCuratorFrameworkクライアントをラップし、 CompletionStage Java 8APIを使用して非ブロッキング機能を提供します。

前の例が非同期ラッパーを使用してどのように見えるかを見てみましょう:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy 
  = new RetryNTimes(maxRetries, sleepMsBetweenRetries);

CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);

client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);

AtomicBoolean exists = new AtomicBoolean(false);

async.checkExists()
  .forPath("/")
  .thenAcceptAsync(s -> exists.set(s != null));

await().until(() -> assertThat(exists.get()).isTrue());

現在、 checkExists()操作は非同期モードで機能し、メインスレッドをブロックしません。 代わりに、 CompletionStage APIを使用するthenAcceptAsync()メソッドを使用して、アクションを次々にチェーンすることもできます。

5. 構成管理

分散環境では、最も一般的な課題の1つは、多くのアプリケーション間で共有構成を管理することです。 Zookeeperを構成を保持するデータストアとして使用できます。

ApacheCuratorを使用してデータを取得および設定する例を見てみましょう。

CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

client.create().forPath(key);

async.setData()
  .forPath(key, expected.getBytes());

AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
  .forPath(key)
  .thenAccept(data -> isEquals.set(new String(data).equals(expected)));

await().until(() -> assertThat(isEquals.get()).isTrue());

この例では、ノードパスを作成し、Zookeeperにデータを設定してから、値が同じであることを確認してデータを復元します。 key フィールドは、 / config / dev /my_keyのようなノードパスである可能性があります。

5.1. ウォッチャー

Zookeeperのもう1つの興味深い機能は、キーまたはノードを監視する機能です。 再デプロイせずに、構成の変更をリッスンし、アプリケーションを更新できます

ウォッチャーを使用した場合の上記の例を見てみましょう。

CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";

async.create().forPath(key);

List<String> changes = new ArrayList<>();

async.watched()
  .getData()
  .forPath(key)
  .event()
  .thenAccept(watchedEvent -> {
    try {
        changes.add(new String(client.getData()
          .forPath(watchedEvent.getPath())));
    } catch (Exception e) {
        // fail ...
    }});

// Set data value for our key
async.setData()
  .forPath(key, expected.getBytes());

await()
  .until(() -> assertThat(changes.size()).isEqualTo(1));

ウォッチャーを構成し、データを設定してから、ウォッチされたイベントがトリガーされたことを確認します。 1つのノードまたはノードのセットを一度に監視できます。

6. 強く型付けされたモデル

Zookeeperは主にバイト配列を処理するため、データをシリアル化および逆シリアル化する必要があります。 これにより、シリアル化可能なインスタンスを柔軟に操作できますが、保守が難しい場合があります。

ここで役立つように、Curatorは型付きモデルの概念を追加します。これはシリアル化/逆シリアル化を委任し、型を直接操作できるようにします。 それがどのように機能するか見てみましょう。

まず、シリアライザーフレームワークが必要です。 CuratorはJackson実装の使用を推奨しているので、Jackson依存関係pom.xmlに追加しましょう。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.0</version>
</dependency>

それでは、カスタムクラスHostConfigを永続化してみましょう。

public class HostConfig {
    private String hostname;
    private int port;

    // getters and setters
}

HostConfig クラスからパスへのモデル仕様マッピングを提供し、ApacheCuratorによって提供されるモデル化されたフレームワークラッパーを使用する必要があります。

ModelSpec<HostConfig> mySpec = ModelSpec.builder(
  ZPath.parseWithIds("/config/dev"), 
  JacksonModelSerializer.build(HostConfig.class))
  .build();

CuratorFramework client = newClient();
client.start();

AsyncCuratorFramework async 
  = AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient 
  = ModeledFramework.wrap(async, mySpec);

modeledClient.set(new HostConfig("host-name", 8080));

modeledClient.read()
  .whenComplete((value, e) -> {
     if (e != null) {
          fail("Cannot read host config", e);
     } else {
          assertThat(value).isNotNull();
          assertThat(value.getHostname()).isEqualTo("host-name");
          assertThat(value.getPort()).isEqualTo(8080);
     }
   });

パス/config / devを読み取るときにwhenComplete()メソッドは、ZookeeperにHostConfigインスタンスを返します。

7. レシピ

Zookeeperは、このガイドラインを提供して、リーダー選出、分散ロック、共有カウンターなどの高レベルのソリューションまたはレシピを実装します。

Apache Curatorは、これらのレシピのほとんどの実装を提供します。 完全なリストを表示するには、キュレーターレシピのドキュメントにアクセスしてください。

これらのレシピはすべて、別のモジュールで利用できます。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

すぐに飛び込んで、いくつかの簡単な例でこれらを理解し始めましょう。

7.1. リーダー選出

分散環境では、複雑なジョブを調整するために1つのマスターノードまたはリーダーノードが必要になる場合があります。

キュレーターでのリーダー選出レシピの使用法は次のようになります。

CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, 
  "/mutex/select/leader/for/job/A", 
  new LeaderSelectorListener() {
      @Override
      public void stateChanged(
        CuratorFramework client, 
        ConnectionState newState) {
      }

      @Override
      public void takeLeadership(
        CuratorFramework client) throws Exception {
      }
  });

// join the members group
leaderSelector.start();

// wait until the job A is done among all members
leaderSelector.close();

リーダーセレクターを起動すると、ノードはパス / mutex / select / Leader / for / job /A内のメンバーグループに参加します。 ノードがリーダーになると、 takeLeadership メソッドが呼び出され、リーダーとしての私たちがジョブを再開できます。

7.2. 共有ロック

共有ロックレシピは、完全に分散されたロックを持つことについてです。

CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
  client, "/mutex/process/A");

sharedLock.acquire();

// do process A

sharedLock.release();

ロックを取得すると、Zookeeperは、同じロックを同時に取得する他のアプリケーションがないことを確認します。

7.3. カウンター

カウンターレシピは、すべてのクライアント間で共有される整数を調整します。

CuratorFramework client = newClient();
client.start();

SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();

counter.setCount(counter.getCount() + 1);

assertThat(counter.getCount()).isEqualTo(1);

この例では、ZookeeperはInteger値をパス/counters / A に格納し、パスがまだ作成されていない場合は値を0に初期化します。

8. 結論

この記事では、ApacheCuratorを使用してApacheZookeeperに接続し、その主な機能を利用する方法について説明しました。

Curatorの主なレシピもいくつか紹介しました。

いつものように、ソースはGitHubにあります。