1. 概要

このチュートリアルでは、 Apache Kafka を簡単に紹介し、Kafkaクラスターでトピックをプログラムで作成および構成する方法を確認します。

2. カフカ入門

Apache Kafkaは、強力で高性能な分散型イベントストリーミングプラットフォームです。

一般に、プロデューサーアプリケーションはイベントをKafkaに公開し、コンシューマーはイベントを読み取って処理するためにこれらのイベントをサブスクライブします。 Kafkaは、 トピックを使用して、これらのイベントを保存および分類します。たとえば、eコマースアプリケーションでは、「注文」トピックが存在する可能性があります。

Kafkaトピックはパーティション化されており、スケーラビリティのために複数のブローカーにデータを分散します。 データをフォールトトレラントで高可用性にするために、これらを複製できます。 トピックは、消費後も必要な限りイベントを保持します。 これはすべて、Kafkaコマンドラインツールとキー値構成を介してトピックごとに管理されます。

ただし、コマンドラインツールに加えて、 Kafkaは、トピック、ブローカー、およびその他のKafkaオブジェクトを管理および検査するための管理APIも提供します。 この例では、このAPIを使用して新しいトピックを作成します。

3. 依存関係

Admin APIを使用するには、 kafka -clientsdependentencyをpom.xmlに追加しましょう。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

4. Kafkaのセットアップ

新しいトピックを作成する前に、少なくとも単一ノードのKafkaクラスターが必要です。

このチュートリアルでは、 Testcontainers フレームワークを使用して、Kafkaコンテナーをインスタンス化します。 次に、実行中の外部Kafkaサーバーに依存しない、信頼性の高い自己完結型の統合テストを実行できます。 このために、特にテスト用にさらに2つの依存関係が必要になります。

まず、TestcontainersKafka依存関係pom.xmlに追加しましょう。

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

次に、JUnit5を使用してTestcontainerテストを実行するためのjunit-jupiterアーティファクトを追加します。

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

必要なすべての依存関係を構成したので、プログラムで新しいトピックを作成するための簡単なアプリケーションを作成できます。

5. 管理API

ローカルブローカーの最小限の構成で新しいPropertiesインスタンスを作成することから始めましょう。

Properties properties = new Properties();
properties.put(
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);

これで、Adminインスタンスを取得できます。

Admin admin = Admin.create(properties)

create メソッドは、 Properties オブジェクト(または Map) bootstrap.servers プロパティを受け入れ、スレッドセーフなインスタンスを返します。

管理クライアントはこのプロパティを使用して、クラスター内のブローカーを検出し、その後、管理操作を実行します。 そのため、一部のインスタンスが使用できない可能性をカバーするには、通常、2つまたは3つのブローカーアドレスを含めるだけで十分です。

AdminClientConfig クラスには、すべてのadminクライアント構成エントリの定数が含まれています。

6. トピックの作成

Testcontainersを使用してJUnit5テストを作成し、トピックの作成が成功したことを確認することから始めましょう。 ConfluentOSSプラットフォームの公式KafkaDockerイメージを使用するKafkaモジュールを利用します。

@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
    kafkaTopicApplication.createTopic("test-topic");

    String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
    String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
      .getStdout();

    assertThat(stdout).contains("test-topic");
}

ここで、Testcontainersは、テストの実行中にKafkaコンテナを自動的にインスタンス化して管理します。 アプリケーションコードを呼び出して、実行中のコンテナでトピックが正常に作成されたことを確認するだけです。

6.1. デフォルトオプションで作成

トピックパーティションとレプリケーションファクターは、新しいトピックの重要な考慮事項です。 物事をシンプルに保ち、1つのパーティションと1のレプリケーション係数でサンプルトピックを作成します。

try (Admin admin = Admin.create(properties)) {
    int partitions = 1;
    short replicationFactor = 1;
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    
    CreateTopicsResult result = admin.createTopics(
      Collections.singleton(newTopic)
    );

    KafkaFuture<Void> future = result.values().get(topicName);
    future.get();
}

ここでは、 Admin.createTopicsメソッドを使用して、デフォルトのオプションで新しいトピックのバッチを作成しました。 Admin インターフェイスは、 AutoCloseable インターフェイスを拡張するため、 try-with-resourcesを使用して操作を実行しました。 これにより、リソースが適切に解放されます。

重要なことに、このメソッドはController Brokerと通信し、非同期で実行されます。 返されたCreateTopicsResultオブジェクトは、リクエストバッチ内の各アイテムの結果にアクセスするためのKafkaFutureを公開します。 これはJava非同期プログラミングパターンに従い、呼び出し元がFuture.getメソッドを使用して操作の結果を取得できるようにします。

同期動作の場合、このメソッドをすぐに呼び出して、操作の結果を取得できます。 これは、操作が完了するか失敗するまでブロックされます。 失敗した場合、根本的な原因をラップするExecutionExceptionが発生します。

6.2. オプションで作成

デフォルトのオプションの代わりに、Admin.createTopicsメソッドのオーバーロード形式を使用して、CreateTopicsOptionsオブジェクトを介していくつかのオプションを提供することもできます。 これらを使用して、新しいトピックを作成するときの管理クライアントの動作を変更できます。

CreateTopicsOptions topicOptions = new CreateTopicsOptions()
  .validateOnly(true)
  .retryOnQuotaViolation(false);

CreateTopicsResult result = admin.createTopics(
  Collections.singleton(newTopic), topicOptions
);

ここでは、 validateOnly オプションをtrueに設定しました。これは、クライアントが実際にトピックを作成せずにのみ検証することを意味します。 同様に、 retryOnQuotaViolation オプションはfalseに設定されているため、クォータ違反が発生した場合に操作が再試行されません。

6.3. 新しいトピック構成

Kafkaには、データ保持や圧縮など、トピックの動作を制御するさまざまなトピック構成があります。 これらには、サーバーのデフォルト値とオプションのトピックごとのオーバーライドの両方があります。

新しいトピックの構成マップを使用して、トピック構成を提供できます。

// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");

NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
  .configs(newTopicConfig);

AdminAPIのTopicConfigクラスには、作成時にトピックを構成するために使用できるキーが含まれています。

7. その他のトピック操作

Admin APIには、新しいトピックを作成する機能に加えて、トピックを削除、一覧表示、および説明する操作もあります。 これらのトピック関連の操作はすべて、トピックの作成で見たのと同じパターンに従います。

これらの各操作メソッドには、xxxTopicOptionsオブジェクトを入力として受け取るオーバーロードバージョンがあります。 これらのメソッドはすべて、対応するxxxTopicsResultオブジェクトを返します。 これにより、非同期操作の結果にアクセスするためのKafkaFutureが提供されます。

最後に、Kafkaバージョン0.11.0.0での導入以来、 InterfaceStability.Evolveing アノテーションで示されているように、管理APIはまだ進化していることにも言及する価値があります。 これは、APIが将来変更される可能性があり、マイナーリリースが互換性を損なう可能性があることを意味します。

8. 結論

このチュートリアルでは、Java管理クライアントを使用してKafkaで新しいトピックを作成する方法を見てきました。

最初に、デフォルトでトピックを作成し、次に明示的なオプションでトピックを作成しました。 これに続いて、さまざまなプロパティを使用して新しいトピックを構成する方法を確認しました。 最後に、管理クライアントを使用した他のトピック関連の操作について簡単に説明しました。

その過程で、Testcontainersを使用して、テストから単純な単一ノードクラスターをセットアップする方法も確認しました。

いつものように、記事の完全なソースコードは、GitHubから入手できます。