1. 概要

この記事では、ApacheKafkaトピックからデータをパージするためのいくつかの戦略について説明します。

2. クリーンアップシナリオ

データをクリーンアップするための戦略を学ぶ前に、パージアクティビティを必要とする簡単なシナリオについて理解しましょう。

2.1. シナリオ

Apache Kafka のメッセージは、設定された保持時間の後に自動的に期限切れになります。 それでも、場合によっては、メッセージの削除をすぐに実行したいことがあります。

Kafkaトピックでメッセージを生成しているアプリケーションコードに欠陥が導入されたと想像してみましょう。 バグ修正が統合されるまでに、Kafkaトピックにはすでに多くの破損メッセージがあり、すぐに使用できます。

このような問題は開発環境で最も一般的であり、迅速な結果が必要です。 したがって、メッセージの一括削除は合理的な方法です。

2.2. シミュレーション

シナリオをシミュレートするために、Kafkaインストールディレクトリからパージシナリオトピックを作成することから始めましょう。

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

次に、 shuf コマンドを使用してランダムデータを生成し、それをkafka-console-producer.shスクリプトにフィードします。

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

tee コマンドを使用して、後で使用するためにシミュレーションデータを保存したことに注意する必要があります。

最後に、コンシューマーがトピックからのメッセージを消費できることを確認しましょう。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
1744
Processed a total of 3 messages

3. メッセージの有効期限

パージシナリオトピックで生成されたメッセージには、デフォルトの保持期間が7日間あります。 メッセージを削除するには、一時的にretention.msトピックレベルプロパティを10秒にリセットし、メッセージの有効期限が切れるのを待ちます。

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

次に、メッセージがトピックから期限切れになっていることを確認しましょう。

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

最後に、トピックの元の保持期間である7日間を復元できます。

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

このアプローチでは、Kafkaはpurge-scenarioトピックのすべてのパーティションにわたってメッセージをパージします。

4. 選択的レコードの削除

特定のトピックから1つ以上のパーティション内のレコードを選択的に削除したい場合があります。 kafka-delete-records.sh スクリプトを使用することで、このような要件を満たすことができます。

まず、delete-config.json構成ファイルでパーティションレベルのオフセットを指定する必要があります。

offset = -1 を使用して、 partition =1からすべてのメッセージを削除しましょう。

{
  "partitions": [
    {
      "topic": "purge-scenario",
      "partition": 1,
      "offset": -1
    }
  ],
  "version": 1
}

次に、レコードの削除に進みましょう。

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

partition =0からまだ読み取れることを確認できます。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
  44017
  Processed a total of 1 messages

ただし、 partition = 1 から読み取る場合、処理するレコードはありません。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

5. トピックを削除して再作成する

Kafkaトピックのすべてのメッセージを削除する別の回避策は、それを削除して再作成することです。 ただし、これは、Kafkaサーバーの起動中にdelete.topic.enableプロパティを trueに設定した場合にのみ可能です。

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

トピックを削除するには、kafka-topics.shスクリプトを使用できます。

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

トピックをリストして確認しましょう:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

トピックがリストされていないことを確認したら、次に進んでトピックを再作成できます。

6. 結論

このチュートリアルでは、ApacheKafkaトピックを削除する必要があるシナリオをシミュレートしました。 さらに、 複数の戦略を検討して、パーティション間で完全にまたは選択的にパージしました