1. 概要

Kafkaコンシューマーグループのラグは、Kafkaベースのイベント駆動型システム主要業績評価指標です。

このチュートリアルでは、Kafkaのコンシューマーラグを監視するアナライザーアプリケーションを構築します。

2. 消費者の遅れ

コンシューマーラグは、ログ内のコンシューマーの最後にコミットされたオフセットとプロデューサーの終了オフセットの間のデルタです。 言い換えると、コンシューマーラグは、プロデューサー-コンシューマーシステムでメッセージを生成してから消費するまでの遅延を測定します。

このセクションでは、オフセット値を決定する方法を理解しましょう。

2.1. Kafka AdminClient

コンシューマーグループのオフセット値を検査するには、管理用のKafkaクライアントが必要です。 それでは、 LagAnalyzerService クラスにメソッドを記述して、AdminClientクラスのインスタンスを作成しましょう。

private AdminClient getAdminClient(String bootstrapServerConfig) {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    return AdminClient.create(config);
}

プロパティファイルからブートストラップサーバーリストを取得するには、@Valueアノテーションを使用していることに注意する必要があります。 同様に、このアノテーションを使用して、groupIdやtopicNameなどの他の値を取得します。

2.2. 消費者グループオフセット

まず、AdminClientクラスのlistConsumerGroupOffsets()メソッドを使用して、特定のコンシューマーグループIDのオフセット情報をフェッチできます。

次に、主にオフセット値に焦点を当てているため、 partitionsToOffsetAndMetadata()メソッドを呼び出して、TopicPartitionとのマップを取得できます。 ResetAndMetadata 値:

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
  throws ExecutionException, InterruptedException {
    ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
    Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

    Map<TopicPartition, Long> groupOffset = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndMetadata metadata = entry.getValue();
        groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
    }
    return groupOffset;
}

最後に、 topicPartitionOffsetAndMetadataMap の反復に気づき、フェッチされた結果を各トピックおよびパーティションごとのオフセット値に制限します。

2.3. プロデューサーオフセット

コンシューマーグループのラグを見つけるために残された唯一のことは、エンドオフセット値を取得する方法です。 このために、 KafkaConsumerクラスのendOffsets()メソッドを使用できます。

LagAnalyzerServiceクラスにKafkaConsumerクラスのインスタンスを作成することから始めましょう。

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(properties);
}

次に、ラグを計算する必要があるコンシューマーグループオフセットからの関連するすべてのTopicPartition値を集計して、 endOffsets()メソッドの引数として提供します。

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
    List<TopicPartition> topicPartitions = new LinkedList<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
        TopicPartition key = entry.getKey();
        topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
    }
    return kafkaConsumer.endOffsets(topicPartitions);
}

最後に、コンシューマーオフセットとプロデューサーの endoffsets を使用して、各TopicPartitionのラグを生成するメソッドを作成しましょう。

private Map<TopicPartition, Long> computeLags(
  Map<TopicPartition, Long> consumerGrpOffsets,
  Map<TopicPartition, Long> producerOffsets) {
    Map<TopicPartition, Long> lags = new HashMap<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
        Long producerOffset = producerOffsets.get(entry.getKey());
        Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
        long lag = Math.abs(producerOffset - consumerOffset);
        lags.putIfAbsent(entry.getKey(), lag);
    }
    return lags;
}

3. ラグアナライザー

それでは、 LagAnalyzerServiceクラスにanalyzeLag()メソッドを記述して、ラグ分析を調整しましょう。

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
    Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
    Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
    for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
        String topic = lagEntry.getKey().topic();
        int partition = lagEntry.getKey().partition();
        Long lag = lagEntry.getValue();
        System.out.printf("Time=%s | Lag for topic = %s, partition = %s is %d\n",
          MonitoringUtil.time(),
          topic,
          partition,
          lag);
    }
}

ただし、ラグメトリックの監視に関しては、システムパフォーマンスを回復するための管理アクションを実行できるように、ほぼリアルタイムのラグ値が必要です

これを実現する簡単な方法の1つは、一定の時間間隔でラグ値をポーリングすることです。 それでは、 LagAnalyzerService: analysiseLag()メソッドを呼び出すLiveLagAnalyzerServiceサービスを作成しましょう。

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
    lagAnalyzerService.analyzeLag(groupId);
}

この目的のために、 @Scheduled アノテーションを使用して、ポーリング頻度を5秒に設定しました。 ただし、リアルタイムの監視では、おそらくJMXを介してこれにアクセスできるようにする必要があります。

4. シミュレーション

このセクションでは、ローカルのKafkaセットアップのKafkaプロデューサーとコンシューマーをシミュレートして、外部のKafkaプロデューサーとコンシューマーに依存せずにLagAnalyzerの動作を確認できるようにします。

4.1. シミュレーションモード

シミュレーションモードはデモンストレーション目的にのみ必要であるため、実際のシナリオでLag Analyzerアプリケーションを実行する場合は、これをオフにするメカニズムが必要です。

これは、application.propertiesリソースファイルで構成可能なプロパティとして保持できます。

monitor.producer.simulate=true
monitor.consumer.simulate=true

これらのプロパティをKafkaのプロデューサーとコンシューマーに接続し、それらの動作を制御します。

さらに、プロデューサー startTime endTime、およびヘルパーメソッド time()を定義して、監視中の現在の時刻を取得しましょう。

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    LocalDateTime now = LocalDateTime.now();
    String date = dtf.format(now);
    return date;
}

4.2. プロデューサー-コンシューマー構成

Kafkaコンシューマーおよびプロデューサーシミュレーターのインスタンスをインスタンス化するために、いくつかのコア構成値を定義する必要があります。

まず、KafkaConsumerConfigクラスでコンシューマーシミュレーターの構成を定義しましょう。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    if (enabled) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    } else {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
    }
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    if (enabled) {
        factory.setConsumerFactory(consumerFactory(groupId));
    } else {
        factory.setConsumerFactory(consumerFactory(simulateGroupId));
    }
    return factory;
}

次に、KafkaProducerConfigクラスでプロデューサーシミュレーターの構成を定義できます。

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

さらに、 @KafkaListener アノテーションを使用して、ターゲットリスナーを指定しましょう。これは、もちろん、monitor.consumer.simulatetrueに設定されている場合にのみ有効になります。 :

@KafkaListener(
  topics = "${monitor.topic.name}",
  containerFactory = "kafkaListenerContainerFactory",
  autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
    Thread.sleep(10L);
}

そのため、10ミリ秒のスリープ時間を追加して、人為的な消費者ラグを作成しました。

最後に、プロデューサーをシミュレートする sendMessage()メソッドを作成しましょう。

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
    if (enabled) {
        if (endTime.after(new Date())) {
            String message = "msg-" + time();
            SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        }
    }
}

プロデューサーが1メッセージ/ミリ秒の速度でメッセージを生成することがわかります。 また、シミュレーションのstartTime後の30秒endTime以降はメッセージの生成を停止します。

4.3. ライブモニタリング

それでは、 LagAnalyzerApplication:でmainメソッドを実行してみましょう。

public static void main(String[] args) {
    SpringApplication.run(LagAnalyzerApplication.class, args);
    while (true) ;
}

30秒ごとに、トピックの各パーティションの現在のラグが表示されます。

Time=2021/06/06 11:07:24 | Lag for topic = baeldungTopic, partition = 0 is 93
Time=2021/06/06 11:07:29 | Lag for topic = baeldungTopic, partition = 0 is 290
Time=2021/06/06 11:07:34 | Lag for topic = baeldungTopic, partition = 0 is 776
Time=2021/06/06 11:07:39 | Lag for topic = baeldungTopic, partition = 0 is 1159
Time=2021/06/06 11:07:44 | Lag for topic = baeldungTopic, partition = 0 is 1559
Time=2021/06/06 11:07:49 | Lag for topic = baeldungTopic, partition = 0 is 2015
Time=2021/06/06 11:07:54 | Lag for topic = baeldungTopic, partition = 0 is 1231
Time=2021/06/06 11:07:59 | Lag for topic = baeldungTopic, partition = 0 is 731
Time=2021/06/06 11:08:04 | Lag for topic = baeldungTopic, partition = 0 is 231
Time=2021/06/06 11:08:09 | Lag for topic = baeldungTopic, partition = 0 is 0

そのため、プロデューサーがメッセージを生成する速度は1メッセージ/ミリ秒であり、これはコンシューマーがメッセージを消費する速度よりも高くなります。 したがって、 lagは最初の30秒間ビルドを開始し、その後プロデューサーは生産を停止するため、lagは徐々に0に減少します。

5. 結論

このチュートリアルでは、Kafkaトピックで消費者の遅れを見つける方法についての理解を深めました。 さらに、その知識を使用して、春に LagAnalyzerアプリケーションを作成し、ほぼリアルタイムでラグを表示しました。

いつものように、チュートリアルの完全なソースコードは、GitHubから入手できます。