1. 概要

Apache Kafka は、強力なオープンソースの分散型フォールトトレラントイベントストリーミングプラットフォームです。 ただし、Kafkaを使用して、構成されたサイズ制限を超えるメッセージを送信すると、エラーが発生します。

前のチュートリアルで、SpringとKafkaの操作方法を示しました。 このチュートリアルでは、Kafkaを使用して大きなメッセージを送信する方法を見ていきます。

2. 問題文

Kafka構成では、送信できるメッセージのサイズが制限されます。デフォルトでは、この制限は1MBです。 ただし、大きなメッセージを送信する必要がある場合は、要件に従ってこれらの構成を微調整する必要があります。

このチュートリアルでは、Kafka v2.5を使用しています。構成にジャンプする前に、まずKafkaのセットアップを調べてみましょう。

3. 設定

ここでは、単一のブローカーで基本的なKafkaセットアップを使用します。 また、プロデューサーアプリケーションは、Kafkaクライアントを使用して、定義されたトピックを介してKafkaブローカーにメッセージを送信できます。 さらに、単一のパーティショントピックを使用しています。

ここでは、Kafka Producer、Kafka Broker、Topic、KafkaConsumerなどの複数のインタラクションポイントを観察できます。 したがって、これらすべては、一方の端からもう一方の端に大きなメッセージを送信できるようにするために構成を更新する必要があります

20MBの大きなメッセージを送信するために、これらの構成を詳しく調べてみましょう。

3. Kafkaプロデューサー構成

これが私たちのメッセージの発信元です。 また、Spring Kafkaを使用して、アプリケーションからKafkaサーバーにメッセージを送信しています。

したがって、プロパティ「max.request.size」を最初に更新する必要があります。 このプロデューサー構成の詳細については、 KafkaDocumentationを参照してください。  これは、SpringKafka依存関係の一部として利用可能なKafkaクライアントライブラリで定数ProducerConfig.MAX_REQUEST_SIZE_CONFIGとして利用できます。

この値を20971520バイトに構成しましょう。

public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

    return new DefaultKafkaProducerFactory<>(configProps);
}

4. Kafkaトピック構成

メッセージ生成アプリケーションは、定義されたトピックでKafkaBrokerにメッセージを送信します。 したがって、次の要件は、使用するKafkaトピックを構成することです。 これは、デフォルト値が1MBの「max.message.bytes」プロパティ更新する必要があることを意味します。

これは、圧縮後のKafkaの最大レコードバッチサイズの値を保持します(圧縮が有効になっている場合)。 詳細については、 KafkaDocumentationを参照してください。

CLIコマンドを使用して、トピックの作成時にこのプロパティを手動で構成しましょう。

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520 

または、Kafkaクライアントを介してこのプロパティを構成することもできます。

public NewTopic topic() {
    NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "20971520");
    newTopic.configs(configs);
    return newTopic;
}

少なくとも、これら2つのプロパティを構成する必要があります。

5. Kafkaブローカーの構成

オプションの構成プロパティ「message.max.bytes」を使用すると、Broker上のすべてのトピックが1MBを超えるサイズのメッセージを受け入れることができます。

そして、これは、圧縮後にKafkaによって許可される最大のレコードバッチサイズの値を保持します(圧縮が有効になっている場合)。 詳細については、 KafkaDocumentationを参照してください。

このプロパティをKafkaBrokerの「server.properties」構成ファイルに追加しましょう。

message.max.bytes=20971520

また、「 message.max.bytes」、「max.message.bytes」の最大値が有効値となります。

6. コンシューマー構成

Kafkaコンシューマーで使用可能な構成設定を調べてみましょう。 これらの変更は大きなメッセージを消費するために必須ではありませんが、それらを回避すると、コンシューマーアプリケーションのパフォーマンスに影響を与える可能性があります。 したがって、これらの構成も適切に配置することをお勧めします。

  • max.partition.fetch.bytes :このプロパティは、コンシューマーがトピックのパーティションからフェッチできるバイト数を制限します。 詳細については、 KafkaDocumentationを参照してください。 これは、Kafkaクライアントライブラリで ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG という名前の定数として使用できます。
  • fetch.max.bytes :このプロパティは、コンシューマーがKafkaサーバー自体からフェッチできるバイト数を制限します。 Kafkaコンシューマーは、複数のパーティションでリッスンすることもできます。 詳細については、 KafkaDocumentationを参照してください。 これは、Kafkaクライアントライブラリで定数ConsumerConfig.FETCH_MAX_BYTES_CONFIGとして利用できます。

したがって、コンシューマーを構成するには、KafkaConsumerFactoryを作成する必要があります。 Topic / Brokerconfigと比較して常に高い値を使用する必要があることを忘れないでください。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
    return new DefaultKafkaConsumerFactory<>(props);
}

ここでは、単一のパーティション Topic を使用しているため、両方のプロパティに同じ構成値20971520バイトを使用しました。 ただし、 FETCH_MAX_BYTES_CONFIG の値は、MAX_PARTITION_FETCH_BYTES_CONFIGよりも大きくする必要があります。パーティション。 一方、config MAX_PARTITION_FETCH_BYTES_CONFIG は、単一のパーティションからのメッセージフェッチサイズを表します。

7. 代替案

Kafkaプロデューサー、トピック、ブローカー、およびKafkaコンシューマーのさまざまな構成を更新して、大きなメッセージを送信する方法を確認しました。 ただし、通常、Kafkaを使用して大きなメッセージを送信することは避けてください。 大きなメッセージの処理は、プロデューサーとコンシューマーのより多くのCPUとメモリを消費します。 したがって、最終的には、他のタスクの処理能力がいくらか制限されます。 また、これにより、エンドユーザーに目に見えて高い遅延が発生する可能性があります。

他の可能なオプションを調べてみましょう:

  1. Kafkaプロデューサーは、メッセージを圧縮する機能を提供します。 さらに、Compression.typeプロパティを使用して構成できるさまざまな圧縮タイプをサポートします。
  2. 大きなメッセージを共有ストレージの場所にあるファイルに保存し、Kafkaメッセージを介してその場所を送信できます。 これはより高速なオプションであり、処理のオーバーヘッドが最小限に抑えられます。
  3. 別のオプションは、プロデューサー側で大きなメッセージをそれぞれ1KBのサイズの小さなメッセージに分割することです。 その後、パーティションキーを使用してこれらすべてのメッセージを単一のパーティションに送信し、正しい順序を確認できます。 したがって、後で、コンシューマー側で、小さなメッセージから大きなメッセージを再構築できます。

上記のオプションのいずれも要件に合わない場合は、前述の構成を選択できます。

8. 結論

この記事では、サイズが1MBを超える大きなメッセージを送信するために必要なさまざまなKafka構成について説明しました。

プロデューサー、トピック、ブローカー、およびコンシューマー側での構成のニーズについて説明しました。 ただし、これらの一部は必須の構成であり、一部はオプションです。 さらに、コンシューマー構成はオプションですが、パフォーマンスへの悪影響を回避する必要があります。

最後に、大きなメッセージを送信するための代替可能なオプションについても説明しました。

いつものように、コード例はGitHub利用可能です。