1. 概要

Apache Kafkaクラスターを使用するイベント駆動型システムを監視するには、多くの場合、アクティブなブローカーのリストを取得する必要があります。 このチュートリアルでは、実行中のクラスター内のアクティブなブローカーのリストを取得するために、いくつかのシェルコマンドについて説明します。

2. 設定

この記事の目的のために、以下の docker-compose.ymlファイルを使用して、2ノードのKafkaクラスターをセットアップしましょう。

$ cat docker-compose.yml
---
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
  
  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

次に、docker-composeコマンドを使用してKafkaクラスターをスピンアップしましょう。

$ docker-compose up -d

Zookeeperサーバーがポート2181でリッスンしているのに対し、Kafkaブローカーはポート 2909239092、でそれぞれリッスンしていることを確認できます。

$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!

3. ZookeeperAPIの使用

Kafkaクラスターでは、 Zookeeperサーバーは、Kafkaブローカーサーバーに関連するメタデータを格納します。 それでは、Zookeeperによって公開されているファイルシステムAPIを使用して、ブローカーの詳細を取得しましょう。

3.1. zookeeper-shellコマンド

ほとんどのKafkaディストリビューションには、zookeeper-shellまたはzookeeper-shell.shバイナリが付属しています。 したがって、このバイナリを使用してZookeeperサーバーと対話することは事実上の標準です。

まず、 localhost:2181で実行されているZookeeperサーバーに接続しましょう。

$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!

Zookeeperサーバーに接続したら、 lsなどの一般的なファイルシステムコマンドを実行して、サーバーに保存されているメタデータ情報を取得できます。 現在生きているブローカーのIDを見つけましょう:

ls /brokers/ids
[1, 2]

現在、ID1と2の2つのアクティブなブローカーがあることがわかります。 get コマンドを使用すると、特定のIDを持つ特定のブローカーの詳細を取得できます。

get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}

id =1のブローカーはポート29092でリッスンし、 id =2の2番目のブローカーはポート39092でリッスンしていることに注意してください。 ]。

最後に、Zookeeperシェルを終了するには、quitコマンドを使用できます。

quit

3.2. zkCliコマンド

Kafkaディストリビューションがzookeeper-shellバイナリで出荷されるのと同様に、ZookeeperディストリビューションはzkCliまたはzkCli.shバイナリで出荷されます。

そのため、 zkCliとのやり取りは、zookeeper-shell とのやり取りとまったく同じです。次に、 id =1を使用してブローカーに必要な詳細を取得できることを確認しましょう。 :

$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}

予想どおり、 zookeeper-shell を使用してフェッチされたブローカーの詳細は、zkCliを使用して取得されたブローカーの詳細と一致することがわかります。

4. ブローカーバージョンAPIの使用

場合によっては、アクティブなブローカーのリストが不完全で、クラスター内で使用可能なすべてのブローカー取得したいことがあります。 このようなシナリオでは、Kafkaディストリビューションに付属のkafka-broker-api-versionsコマンド使用できます。

localhost:29092 で実行されているブローカーについて知っていると仮定して、Kafkaクラスターに参加しているすべてのアクティブなブローカーを見つけてみましょう。

$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092

awk コマンドを使用して出力をフィルタリングし、ブローカーアドレスのみを表示したことは注目に値します。 さらに、結果は、クラスター内に2つのアクティブなブローカーがあることを正しく示しています。

このアプローチはZookeeperCLIアプローチよりも単純に見えますが、 kafka-broker-api-versionsbinaryはKafkaディストリビューションに最近追加されたものにすぎません。

5. シェルスクリプト

実際のシナリオでは、ブローカーごとにzkCliまたはzookeeper-shellコマンドを手動で実行すると、負担がかかります。 それでは、Zookeeperサーバーアドレスを入力として受け取り、その代わりにすべてのアクティブなブローカーのリストを提供するシェルスクリプトを作成しましょう。

5.1. ヘルパー関数

すべてのヘルパー関数をfunctions.shスクリプトに記述してみましょう。

$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"

# Helper Functions Below

まず、 get_broker_ids関数を記述して、zkCliコマンドを内部的に呼び出すアクティブなブローカーIDのセットを取得しましょう。

function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}

次に、 get_broker_details関数を記述して、broker_idを使用して詳細なブローカーの詳細を取得します。

function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}

詳細なブローカーの詳細がわかったので、 parse_broker_endpoint 関数を記述して、ブローカーのエンドポイントの詳細を取得しましょう。

function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail"  | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . |  grep HOST | tr -d " ")"
}

内部的には、JSON解析jqコマンドを使用しました。

5.2. メインスクリプト

それでは、features.shで定義されたヘルパー関数を使用するメインスクリプトget_all_active_brokers.shを記述しましょう。

$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"

function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
    broker_details="$(get_broker_details $broker_id)"
    broker_endpoint=$(parse_endpoint_detail "$broker_details")
    echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}

get_all_active_brokers

get_all_active_brokers関数のすべてのbroker_idsを繰り返して、すべてのアクティブなブローカーのエンドポイントを集約していることがわかります。

最後に、 get_all_active_brokers.sh スクリプトを実行して、2ノードのKafkaクラスターのアクティブなブローカーのリストを確認しましょう。

$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"

結果が正確であることがわかります。 釘付けにしたようです!

6. 結論

このチュートリアルでは、アクティブなブローカーのリストを取得するために、 zookeeper-shell zkCli kafka-broker-api-versionsなどのシェルコマンドについて学習しました。 Kafkaクラスター内。 さらに、実際のシナリオでブローカーの詳細を見つけるプロセスを自動化するシェルスクリプトを作成しました。