1. 概要

このチュートリアルでは、 Kafkaが、新しく導入されたTransactionalAPIを介してプロデューサーアプリケーションとコンシューマーアプリケーション間で1回限りの配信を保証する方法を見ていきます。

さらに、このAPIを使用して、トランザクションのプロデューサーとコンシューマーを実装し、WordCountの例でエンドツーエンドの正確に1回の配信を実現します。

2. カフカでのメッセージ配信

さまざまな障害が原因で、メッセージングシステムはプロデューサーアプリケーションとコンシューマーアプリケーション間のメッセージ配信を保証できません。 クライアントアプリケーションがそのようなシステムとどのように相互作用するかに応じて、次のメッセージセマンティクスが可能です。

  • メッセージングシステムがメッセージを複製することは決してないが、時折メッセージを見逃す可能性がある場合、それを最大1回と呼びます。
  • または、メッセージを見逃すことはないが、時折メッセージが重複する可能性がある場合は、at-least-onceと呼びます。
  • ただし、常にすべてのメッセージを重複せずに配信する場合、それは正確に1回です。

当初、Kafkaは最大1回および少なくとも1回のメッセージ配信のみをサポートしていました。

ただし、 Kafkaブローカーとクライアントアプリケーション間のトランザクションの導入により、Kafkaでの1回限りの配信が保証されます。 それをよりよく理解するために、トランザクションクライアントAPIを簡単に確認しましょう。

3. Mavenの依存関係

トランザクションAPIを使用するには、pomにKafkaのJavaクライアントが必要です。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

4. トランザクション消費-変換-生成ループ

この例では、入力トピックセンテンスからのメッセージを使用します。

次に、文ごとに、すべての単語をカウントし、個々の単語カウントを出力トピックcountsに送信します。

この例では、トピックで利用可能なトランザクションデータがすでに存在すると想定します。

4.1. トランザクション対応のプロデューサー

それでは、最初に典型的なKafkaプロデューサーを追加しましょう。

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");

さらに、 transactional.id を指定し、べき等を有効にする必要があります。

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");

KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

べき等を有効にしたため、Kafkaはアルゴリズムの一部としてこのトランザクションIDを使用して、このプロデューサー が送信するメッセージを重複排除し、べき等を確保します。

簡単に言えば、プロデューサーが誤って同じメッセージをKafkaに複数回送信した場合、これらの設定により、プロデューサーは気付くことができます。

トランザクションIDがプロデューサーごとに異なることを確認するだけですが、再起動間で一貫性があります。

4.2. トランザクションのプロデューサーの有効化

準備ができたら、 initTransaction を呼び出して、プロデューサーがトランザクションを使用できるように準備する必要があります。

producer.initTransactions();

これにより、プロデューサーがトランザクションを使用できるブローカーとして登録され、そのtransactional.idとシーケンス番号またはエポックによって識別されます。 次に、ブローカーはこれらを使用して、アクションをトランザクションログに先行書き込みします。

その結果、ブローカーは、同じトランザクションIDと以前の エポック、が無効なトランザクションからのものであると想定して、プロデューサーに属するすべてのアクションをログから削除します。

4.3. トランザクションを意識した消費者

消費すると、トピックパーティション上のすべてのメッセージを順番に読み取ることができます。 ただし、は、関連するトランザクションがコミットされるまでトランザクションメッセージの読み取りを待機する必要があることをisolation.levelで示すことができます

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group-id");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(singleton(“sentences”));

read_committed の値を使用すると、トランザクションが完了する前にトランザクションメッセージを読み取らないようになります。

isolation.levelのデフォルト値はread_uncommittedです。

4.4. トランザクションによる消費と変換

プロデューサーとコンシューマーの両方がトランザクションで書き込みと読み取りを行うように構成されたので、入力トピックのレコードを使用して、各レコードの各単語をカウントできます。

ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap =
  records.records(new TopicPartition("input", 0))
    .stream()
    .flatMap(record -> Stream.of(record.value().split(" ")))
    .map(word -> Tuple.of(word, 1))
    .collect(Collectors.toMap(tuple -> 
      tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

上記のコードについてはトランザクションがないことに注意してください。 ただし、 read_committedを使用したため、同じトランザクションで入力トピックに書き込まれたメッセージは、すべて書き込まれるまでこのコンシューマーによって読み取られないことを意味します。

これで、計算された単語数を出力トピックに送信できます。

トランザクションでも結果を生成する方法を見てみましょう。

4.5. APIを送信する

カウントを新しいメッセージとして送信するために、同じトランザクションで、beginTransactionを呼び出します。

producer.beginTransaction();

次に、キーが単語でカウントが値である「カウント」トピックにそれぞれを書き込むことができます。

wordCountMap.forEach((key,value) -> 
    producer.send(new ProducerRecord<String,String>("counts",key,value.toString())));

プロデューサーはキーでデータを分割できるため、トランザクションメッセージは複数のパーティションにまたがることができ、それぞれが別々のコンシューマーによって読み取られることに注意してください。したがって、Kafkaブローカーは更新されたすべてのパーティションのリストを保存します。取引。

また、トランザクション内で、プロデューサーは複数のスレッドを使用してレコードを並列に送信できることにも注意してください

4.6. オフセットのコミット

そして最後に、消費を終えたばかりのオフセットをコミットする必要があります。 トランザクションでは、通常のように、オフセットを読み取った入力トピックにコミットします。ただし、 weはそれらをプロデューサーのトランザクションに送信します。

これらすべてを1回の呼び出しで実行できますが、最初に各トピックパーティションのオフセットを計算する必要があります。

Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
    long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}

トランザクションにコミットするのは次のオフセットであることに注意してください。つまり、1を追加する必要があります。

次に、計算されたオフセットをトランザクションに送信できます。

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. トランザクションのコミットまたは中止

そして最後に、トランザクションをコミットできます。これにより、オフセットがConsumer_offsetsトピックとトランザクション自体にアトミックに書き込まれます。

producer.commitTransaction();

これにより、バッファリングされたメッセージがそれぞれのパーティションにフラッシュされます。 さらに、Kafkaブローカーは、そのトランザクション内のすべてのメッセージをコンシューマーが利用できるようにします。

もちろん、処理中に問題が発生した場合、たとえば例外が発生した場合は、 abortTransaction:を呼び出すことができます。

try {
  // ... read from input topic
  // ... transform
  // ... write to output topic
  producer.commitTransaction();
} catch ( Exception e ) {
  producer.abortTransaction();
}

そして、バッファリングされたメッセージをすべて削除し、ブローカーからトランザクションを削除します。

ブローカーが構成したmax.transaction.timeout.msの前にコミットも中止もしなかった場合、Kafkaブローカーはトランザクション自体を中止します。 このプロパティのデフォルト値は900,000ミリ秒または15分です。

5. その他のconsume-transform-produceループ

今見たのは、同じKafkaクラスターに対して読み取りと書き込みを行う基本的なconsume-transform-produceループです。

逆に、異なるKafkaクラスターに対して読み取りと書き込みを行う必要があるアプリケーションは、古いcommitSyncおよびcommitAsyncAPIを使用する必要があります。 通常、アプリケーションは、トランザクション性を維持するために、コンシューマーオフセットを外部状態ストレージに格納します。

6. 結論

データクリティカルなアプリケーションの場合、エンドツーエンドの正確に1回の処理が必要になることがよくあります。

このチュートリアルでは、トランザクションを使用して、Kafkaを使用してこれを正確に実行する方法を確認し、トランザクションベースの単語カウントの例を実装して原理を説明しました。

GitHubですべてのコードサンプルを確認してください。