1. 概要

Apache Kafka は、Dockerで頻繁に使用される非常に人気のあるイベントストリーミングプラットフォームです。 多くの場合、特にクライアントが同じDockerネットワークまたは同じホストで実行されていない場合、Kafkaとの接続確立の問題が発生します。 これは主に、Kafkaのアドバタイズされたリスナーの設定ミスが原因です。

このチュートリアルでは、クライアントがDocker内で実行されているKafkaブローカーに接続できるようにリスナーを構成する方法を学習します。

2. Kafkaをセットアップする

接続を確立する前に、Dockerを使用してKafkaブローカーを実行する必要があります。 docker-compose.yamlファイルのスニペットは次のとおりです。

version: '2'
services:
  zookeeper:
    container_name: zookeeper
    networks: 
      - kafka_network
    ...
  
  kafka:
    container_name: kafka
    networks: 
      - kafka_network
    ports:
      - 29092:29092
    environment:
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
     ... 

networks:
  kafka_network:
    name: kafka_docker_example_net

ここでは、KafkaとZookeeperの2つの必須サービスを定義しました。 カスタムネットワークも定義しました– kafka_docker_example_net、 私たちのサービスが使用する

KAFKA_LISTENERS KAFKA_ADVERTISED_LISTENERS、、およびKAFKA_LISTENER_SECURITY_PROTOCOL_MAPプロパティについては後で詳しく説明します。

上記のdocker-compose.yamlファイルを使用して、サービスを開始します。

docker-compose up -d
Creating network "kafka_docker_example_net" with the default driver
Creating zookeeper ... done
Creating kafka ... done

また、Kafka コンソールプロデューサーユーティリティをサンプルクライアントとして使用して、Kafkaブローカーへの接続をテストします。 DockerなしでKafka-console-producerスクリプトを使用するには、Kafkaをダウンロードする必要があります。

3. リスナー

リスナー、アドバタイズされたリスナー、およびリスナープロトコルは、Kafkaブローカーと接続するときに重要な役割を果たします。

KAFKA_LISTENERS プロパティを使用してリスナーを管理します。このプロパティでは、ブローカーが着信TCP接続をリッスンする必要があるソケットを指定するURIのコンマ区切りリストを宣言します。

各URIは、プロトコル名と、それに続くインターフェイスアドレスおよびポートで構成されます。

EXTERNAL_SAME_HOST://0.0.0.0:29092,INTERNAL://0.0.0.0:9092

ここでは、 0.0.0.0 メタアドレスを指定して、ソケットをすべてのインターフェイスにバインドしました。 さらに、EXTERNAL_SAME_HOSTおよびINTERNALは、URI形式でリスナーを定義するときに指定する必要があるカスタムリスナー名です。

3.2. ブートストラップ

初期接続の場合、Kafkaクライアントには、ブローカーのアドレスを指定するブートストラップサーバーリストが必要です。 リストには、クラスター内のランダムブローカーへの有効なアドレスが少なくとも1つ含まれている必要があります。

クライアントはそのアドレスを使用してブローカーに接続します。 接続が成功すると、ブローカーは、クラスター内のすべてのブローカーのアドバタイズされたリスナーリストを含む、クラスターに関するメタデータを返します。 以降の接続では、クライアントはそのリストを使用してブローカーに到達します。

3.3. アドバタイズされたリスナー

リスナーを宣言するだけでは、ブローカーのソケット構成にすぎないため、十分ではありません。 クライアント(消費者と生産者)にKafkaへの接続方法を伝える方法が必要です。

これは、アドバタイズされたリスナーがKAFKA_ADVERTISED_LISTENERSプロパティの助けを借りて登場する場所です。 リスナーのプロパティと同様の形式です。

://

クライアントは、最初のブートストラッププロセスの後に、アドバタイズされたリスナーとして指定されたアドレスを使用します。

3.4. リスナーセキュリティプロトコルマップ

リスナーとアドバタイズされたリスナーとは別に、Kafkaに接続するときに使用するセキュリティプロトコルについてクライアントに通知する必要があります。 KAFKA_LISTENER_SECURITY_PROTOCOL_MAPでは、カスタムプロトコル名を有効なセキュリティプロトコルにマップします。

前のセクションの構成では、2つのカスタムプロトコル名を宣言しました– 内部 EXTERNAL_SAME_HOST。 必要に応じて名前を付けることができますが、有効なセキュリティプロトコルにマップする必要があります。

指定したセキュリティプロトコルの1つはPLAINTEXTです。これは、クライアントがKafkaブローカーで認証する必要がないことを意味します。 また、交換されるデータは暗号化されていません。

4. 同じDockerネットワークから接続しているクライアント

別のコンテナーからKafkaコンソールプロデューサーを起動し、ブローカーへのメッセージを生成してみましょう。

docker run -it --rm --network kafka_docker_example_net confluentinc/cp-kafka /bin/kafka-console-producer --bootstrap-server kafka:9092 --topic test_topic
>hello
>world

ここでは、このコンテナーを既存の kafka_docker_example_net ネットワークに接続して、ブローカーと自由に通信できるようにしています。 また、ブローカーのアドレス– kafka:9092 と、自動的に作成されるトピックの名前も指定します。

トピックへのメッセージを生成できました。これは、ブローカーへの接続が成功したことを意味します。

5. 同じホストから接続しているクライアント

クライアントがコンテナ化されていないときに、ホストマシンからブローカーに接続しましょう。 外部接続については、 EXTERNAL_SAME_HOST リスナーをアドバタイズしました。これを使用して、ホストからの接続を確立できます。 アドバタイズされたリスナープロパティから、Kafkaブローカーに到達するには localhost:29092アドレスを使用する必要があることがわかります。

同じホストからの接続をテストするには、DockerizedされていないKafkaコンソールプロデューサーを使用します。

kafka-console-producer --bootstrap-server localhost:29092 --topic test_topic_2
>hi
>there

トピックを作成できたので、最初のブートストラップとその後のブローカーへの接続(アドバタイズされたリスナーがクライアントによって使用される場合)の両方が成功したことを意味します。

以前にdocker-compose.yamlで構成したポート番号29092により、KafkaブローカーはDockerの外部で到達可能になりました。

6. 別のホストから接続しているクライアント

Kafkaブローカーが別のホストマシンで実行されている場合、どのように接続しますか? 残念ながら、既存のリスナーは同じDockerネットワークまたはホスト接続専用であるため、再利用することはできません。 したがって、代わりに、新しいリスナーを定義してアドバタイズする必要があります。

KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT

と呼ばれる新しいリスナーを作成しました EXTERNAL_DIFFERENT_HOST セキュリティプロトコル付き平文および関連付けられたポート29093 KAFKA_ADVERTISED_LISTENERS 、Kafkaが実行されているクラウドマシンのIPアドレスも追加しました。

別のマシン(この場合はローカルワークステーション)から接続しているため、localhostを使用できないことに注意する必要があります。 また、ポート29093はポートセクションの下に公開されているため、Dockerの外部からアクセスできます。

いくつかのメッセージを作成してみましょう。

kafka-console-producer --bootstrap-server 157.245.80.232:29093 --topic test_topic_3
>hello
>REMOTE SERVER

Kafkaブローカーに接続し、メッセージを正常に生成できたことがわかります。

7. 結論

この記事では、クライアントがDocker内で実行されているKafkaブローカーに接続できるようにリスナーを構成する方法を学びました。 クライアントが同じDockerネットワーク、同じホスト、異なるホストなどで実行されているさまざまなシナリオを検討しました。 リスナー、アドバタイズされたリスナー、およびセキュリティプロトコルマップの構成によって接続が決まることがわかりました。