1. 概要

プロデューサーがApacheKafkaにメッセージを送信すると、プロデューサーはそのメッセージをログファイルに追加し、設定された期間保持します。

このチュートリアルでは、Kafkaトピックの時間ベースのメッセージ保持プロパティを構成する方法を学習します。

2. 時間ベースの保持

保持期間のプロパティが設定されている場合、メッセージにはTTL(存続時間)があります。 有効期限が切れると、メッセージに削除のマークが付けられ、ディスク領域が解放されます。

同じ保持期間プロパティが、特定のKafkaトピック内のすべてのメッセージに適用されます。 さらに、これらのプロパティは、トピックを作成する前に設定することも、実行時に既存のトピックに変更することもできます。

次のセクションでは、新しいトピックの保持期間を設定するためのブローカー構成と、実行時にそれを制御するためのトピックレベルの構成を介してこれを調整する方法を学習します。

3. サーバーレベルの構成

Apache Kafkaは、サーバーレベルの保持ポリシーをサポートしています。これは、3つの時間ベースの構成プロパティの1つを正確に構成することで調整できます。

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

Kafkaは、精度の低い値を高い値でオーバーライドすることを理解することが重要です。 したがって、log.retention.msが最も優先されます

3.1. 基本

まず、ApacheKafkaディレクトリからgrepコマンドを実行して、保持のデフォルト値を調べます。

$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168

ここで、デフォルトの保持期間が7日であることがわかります。

メッセージを10分間だけ保持するために、 config /server.propertieslog.retention.minutesプロパティの値を設定できます。

log.retention.minutes=10

3.2. 新しいトピックの保持期間

Apache Kafkaパッケージには、管理タスクの実行に使用できるいくつかのシェルスクリプトが含まれています。 これらを使用して、このチュートリアルの過程で使用するヘルパースクリプトfunctions.sh作成します

features.sh に2つの関数を追加して、トピックを作成し、その構成をそれぞれ説明することから始めましょう。

function create_topic {
    topic_name="$1"
    bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
      --partitions 1 --replication-factor 1 \
      --zookeeper localhost:2181
}

function describe_topic_config {
    topic_name="$1"
    ./bin/kafka-configs.sh --describe --all \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

次に、create-topic.shget-topic-retention-time.shの2つのスタンドアロンスクリプトを作成しましょう。

bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?

describe_topic_config は、トピック用に構成されたすべてのプロパティを提供することに注意する必要があります。 そこで、 awk ワンライナーを使用して、retention.msプロパティのフィルターを追加しました。

最後に、 Kafka環境を開始し、新しいサンプルトピックの保持期間の構成を確認しましょう。

bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000

トピックを作成して説明すると、retention.msが600000(10分)に設定されていることがわかります。 これは実際には、server.propertiesファイルで以前に定義したlog.retention.minutesプロパティから派生したです。

4. トピックレベルの構成

Brokerサーバーが起動すると、log.retention。{hours | minutes|ms}サーバーレベルのプロパティが読み取り専用になります。 一方、retention.ms プロパティにアクセスできます。これは、トピックレベルで調整できます。

features.sh スクリプトにメソッドを追加して、トピックのプロパティを構成しましょう。

function alter_topic_config {
    topic_name="$1"
    config_name="$2"
    config_value="$3"
    ./bin/kafka-configs.sh --alter \
      --add-config ${config_name}=${config_value} \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

次に、これをalter-topic-config.shスクリプト内で使用できます。

#!/bin/sh
. ./functions.sh

alter_topic_retention_config $1 $2 $3
exit $?

最後に、 test-topic の保持時間を5分に設定し、同じことを確認しましょう。

bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.

bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000

5. 検証

これまで、Kafkaトピック内のメッセージの保持期間を構成する方法を見てきました。 保持タイムアウト後にメッセージが実際に期限切れになることを検証するときが来ました。

5.1. 生産者/消費者

追加しましょう produce_message消費メッセージの機能関数.sh。 内部的には、これらは kafka-console-producer.sh kafka-console-consumer.sh 、それぞれ、メッセージを生成/消費するために:

function produce_message {
    topic_name="$1"
    message="$2"
    echo "${message}" | ./bin/kafka-console-producer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --topic ${topic_name}
}

function consume_message {
    topic_name="$1"
    timeout="$2"
    ./bin/kafka-console-consumer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --from-beginning \
    --topic ${topic_name} \
    --max-messages 1 \
    --timeout-ms $timeout
}

[X11X]がKafkaで利用可能なメッセージを読み取るコンシューマーが必要なため、コンシューマーは常に最初からメッセージを読み取ることに注意する必要があります

次に、スタンドアロンのメッセージプロデューサーを作成しましょう。

bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"

produce_message ${topic_name} ${message}
exit $?

最後に、スタンドアロンのメッセージコンシューマーを作成しましょう。

bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"

consume_message ${topic_name} $timeout
exit $?

5.2. メッセージの有効期限

基本的なセットアップの準備ができたので、1つのメッセージを生成し、それを即座に2回消費しましょう。

bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages

したがって、消費者が利用可能なメッセージを繰り返し消費していることがわかります。

それでは、5分のスリープ遅延を導入してから、メッセージを消費してみましょう。

bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

予想どおり、コンシューマーは、メッセージが保持期間を超えたため、消費するメッセージを見つけられませんでした。

6. 制限事項

内部的には、KafkaBrokerはと呼ばれる別のプロパティを維持しています log.retention.check.interval.ms。 このプロパティは、メッセージの有効期限がチェックされる頻度を決定します。

したがって、保持ポリシーを有効に保つには、log.retention.check.interval.msの値がretention.msのプロパティ値よりも小さいことを確認する必要があります。与えられたトピック。

7. 結論

このチュートリアルでは、 Apache Kafkaを調べて、メッセージの時間ベースの保持ポリシーを理解しました。 その過程で、管理活動を簡素化するための簡単なシェルスクリプトを作成しました。 その後、保持期間後のメッセージの有効期限を検証するために、スタンドアロンのコンシューマーとプロデューサーを作成しました。