1. 概要

このクイックチュートリアルでは、Kafkaの消費者グループを一覧表示する方法と、その詳細を確認する方法を学習します。

2. 前提条件

このチュートリアルの例を実行するには、リクエストを送信するKafkaクラスターが必要です。 これは、実稼働環境で実行されている本格的なKafkaクラスターの場合もあれば、テスト固有のシングルインスタンスKafkaクラスターの場合もあります。

簡単にするために、ローカルホストの2181ポートをリッスンする Zookeeper インスタンスを使用して、ポート9092をリッスンするシングルノードクラスターがあると想定します。

さらに、Kafkaインストールディレクトリからすべてのサンプルコマンドを実行していることに注意してください。

3. トピックとコンシューマーの追加

特定のKafkaクラスターのコンシューマーを一覧表示する前に、 kafka-topics.sh シェルスクリプトを使用して、最初にいくつかのトピックを追加しましょう。

$ ./bin/kafka-topics.sh --create --topic users.registrations --replication-factor 1 \ 
  --partitions 2 --zookeeper localhost:2181
$ ./bin/kafka-topics.sh --create --topic users.verfications --replication-factor 1 \ 
  --partitions 2 --zookeeper localhost:2181

ここで、いくつかのコンシューマーグループも追加する必要があります。 最も簡単な方法は、Kafkaディストリビューションにバンドルされているコンソールコンシューマーを使用することです。

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.registrations --group new-user
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.registrations --group new-user

ここでは、 kafka-console-consumer.sh シェルスクリプトを使用して、同じトピックをリッスンしている2人のコンシューマーを追加しました。 これらのコンシューマーは同じグループに属しているため、トピックパーティションからのメッセージはグループのメンバー全体に分散されます。 このようにして、Kafkaで競合するコンシューマーパターンを実装できます。

別のトピックからも消費しましょう:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users.verifications

コンシューマーのグループを指定しなかったため、コンソールコンシューマーは、それ自体を唯一のメンバーとして新しいグループを作成しました。

この新しいグループについては、次のセクションで説明します。ここでは、Kafkaクラスターでコンシューマーとコンシューマーグループを一覧表示する方法を学習します。

4. 消費者のリスト

Kafkaクラスター内のコンシューマーを一覧表示するには、kafka-consumer-groups.shシェルスクリプトを使用できます。 –listオプションは、すべてのコンシューマーグループを一覧表示します

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
new-user
console-consumer-40123

–list オプションに加えて、 –bootstrap-serverオプションを渡してKafkaクラスターアドレスを指定します。 2つのグループに3つの個別の消費者がいるため、結果には2つのグループのみが含まれます。

最初のグループのメンバーを見るには 、使用できます “-グループ –describe –members “ オプション:

$ ./bin/kafka-consumer-groups.sh --describe --group new-user --members --bootstrap-server localhost:9092
GROUP           CONSUMER-ID                    HOST            CLIENT-ID            #PARTITIONS
new-user        consumer-new-user-1-b90...     /127.0.0.1      consumer-new-user-1  1
new-user        consumer-new-user-1-af8...     /127.0.0.1      consumer-new-user-1  1

ここでは、 new-user グループに2つの個別のコンシューマーがあり、それぞれが1つのパーティションから消費していることがわかります。

–members オプションを省略すると、グループ内のコンシューマー、それぞれがリッスンしているパーティション番号、およびそれらのオフセットが一覧表示されます。

$ ./bin/kafka-consumer-groups.sh --describe --group new-user --bootstrap-server localhost:9092
GROUP           TOPIC                       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG          
new-user        users.registrations         1          3               3               0              
new-user        users.registrations         0          5               5               0            

このコマンドには、クラスターまたはブートストラップサーバーのアドレスが必要であることに注意してください。 クラスター接続情報を省略すると、シェルスクリプトはエラーをスローします

$ ./bin/kafka-consumer-groups.sh --list
Missing required argument "[bootstrap-server]"
// truncated

5. 結論

この短いチュートリアルでは、最初にいくつかのKafkaトピックと消費者グループを追加しました。 次に、消費者グループを一覧表示し、各グループの詳細を表示する方法を学びました。