1. 序章

Apache Kafka は、最も人気のあるオープンソースの分散型でフォールトトレラントなストリーム処理システムです。 Kafka Consumerは、メッセージを処理するための基本的な機能を提供します。 Kafka Streamsは、KafkaConsumerクライアント上でリアルタイムのストリーム処理も提供します。

このチュートリアルでは、ストリーム処理エクスペリエンスをシンプルかつ簡単にするためのKafkaStreamsの機能について説明します。

2. ストリームとコンシューマーAPIの違い

2.1. KafkaコンシューマーAPI

一言で言えば、 Kafka Consumer API を使用すると、アプリケーションはトピックからのメッセージを処理できます。 次の機能を含む、それらと対話するための基本コンポーネントを提供します

  • 消費者と生産者の間の責任の分離
  • 単一処理
  • バッチ処理のサポート
  • ステートレスサポートのみ。 クライアントは以前の状態を保持せず、ストリーム内の各レコードを個別に評価します
  • アプリケーションを書くにはたくさんのコードが必要です
  • スレッド化や並列処理は使用しません
  • 複数のKafkaクラスターに書き込むことができます

2.2. Kafka Streams API

Kafka Streams は、トピックからのストリーム処理を大幅に簡素化します。 Kafkaクライアントライブラリ上に構築され、データの並列処理、分散調整、フォールトトレランス、およびスケーラビリティを提供します。 メッセージは、次の特性を備えた、無制限の継続的なリアルタイムのレコードフローとして処理されます。

  • 消費および生産する単一のカフカストリーム
  • 複雑な処理を実行する
  • バッチ処理をサポートしていません
  • ステートレスおよびステートフル操作をサポートする
  • アプリケーションを作成するには、数行のコードが必要です
  • スレッディングと並列処理
  • 単一のKafkaクラスターとのみ対話する
  • メッセージを保存および転送するための論理ユニットとしてパーティションとタスクをストリーミングする

Kafka Streamsは、 パーティションとタスクの概念を、トピックパーティションに強くリンクされた論理ユニットとして使用します。 さらに、スレッドを使用して、アプリケーションインスタンス内の処理を並列化します。 サポートされているもう1つの重要な機能は、トピックからのデータを保存およびクエリするためにKafkaStreamsによって使用される状態ストアです。 最後に、Kafka Streams APIはクラスターと対話しますが、クラスター上で直接実行されることはありません。

次のセクションでは、基本的なKafkaクライアントに関して違いを生む4つの側面に焦点を当てます。ストリームテーブルの二重性、Kafka Streamsドメイン固有言語(DSL)、Exactly-Once Processing Semantics(EOS)、およびインタラクティブクエリです。 。

2.3. 依存関係

例を実装するには、 Kafka ConsumerAPIKafkaStreamsAPIの依存関係をpom.xmlに追加するだけです。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
 </dependency>

3. ストリームテーブルの二重性

Kafka Streamsはストリームをサポートしますが、双方向に変換できるテーブルもサポートします。  いわゆるストリームテーブルの二重性 。 表は、進化する事実のセットです。 新しいイベントはそれぞれ古いイベントを上書きしますが、ストリームは不変のファクトのコレクションです。

ストリームは、トピックからのデータの完全なフローを処理します。 テーブルは、ストリームからの情報を集約することによって状態を格納します。 Kafka DataModellingで説明されているチェスゲームをプレイすることを想像してみましょう。 連続移動のストリームはテーブルに集約され、ある状態から別の状態に遷移できます。

3.1. KStream KTable 、および GlobalKTable

Kafka Streamsは、StreamsとTablesに2つの抽象化を提供します。 KStreamはレコードのストリームを処理します。 一方、 KTable は、特定のキーの最新の状態で変更ログストリームを管理します。 各データレコードは更新を表します。

パーティション化されていないテーブルには別の抽象化があります。 GlobalKTables を使用して、すべてのタスクに情報をブロードキャストしたり、入力データを再パーティション化せずに結合を実行したりできます。

トピックをストリームとして読み取り、逆シリアル化できます。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = 
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

トピックを読んで、テーブルとして受け取った最新の単語を追跡することもできます。

KTable<String, String> textLinesTable = 
  builder.table(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

最後に、グローバルテーブルを使用してトピックを読み取ることができます。

GlobalKTable<String, String> textLinesGlobalTable = 
  builder.globalTable(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

4. Kafka Streams DSL

Kafka Streams DSLは、宣言型で関数型のプログラミングスタイルです Streams ProcessorAPIの上に構築されています。 この言語は、前のセクションで説明したストリームとテーブルの組み込みの抽象化を提供します。

さらに、ステートレス(マップフィルターなど)およびステートフル変換(集約結合、および)もサポートします。 windowing )。 したがって、わずか数行のコードでストリーム処理操作を実装できます。

4.1. ステートレス変換

ステートレス変換は、処理に状態を必要としません。 同様に、ストリームプロセッサには状態ストアは必要ありません。 操作例には、 filter map flatMap、、またはgroupByが含まれます。

次に、値を大文字としてマップし、トピックからフィルターして、ストリームとして保存する方法を見てみましょう。

KStream<String, String> textLinesUpperCase =
  textLines
    .map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
    .filter((key, value) -> value.contains("FILTER"));

4.2. ステートフルな変換

ステートフル変換は、処理操作を実行するために状態に依存します。 メッセージの処理は、他のメッセージ(状態ストア)の処理に依存します。 つまり、changelogトピックを使用して、任意のテーブルまたは状態ストアを復元できます。

ステートフル変換の例は、単語数アルゴリズムです。

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(value
    .toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, word) -> word)
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

これらの2つの文字列をトピックに送信します。

String TEXT_EXAMPLE_1 = "test test and test";
String TEXT_EXAMPLE_2 = "test filter filter this sentence";

結果は次のとおりです。

Word: and -> 1
Word: test -> 4
Word: filter -> 2
Word: this -> 1
Word: sentence -> 1

DSLはいくつかの変換機能をカバーしています。 結合するか、2つの入力ストリーム/テーブルを同じキーでマージして新しいストリーム/テーブルを作成できます。 また、集約、またはストリーム/テーブルからの複数のレコードを新しいテーブルの1つのレコードに結合することもできます。 最後に、 windowing を適用して、結合または集計関数で同じキーを持つレコードをグループ化することができます。

5sウィンドウで結合する例では、キーによってグループ化されたレコードを2つのストリームから1つのストリームにマージします。

KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
  (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.of(Duration.ofSeconds(5))).groupByKey()
      .reduce(((key, lastValue) -> lastValue))
  .toStream();

したがって、 key =1の左側のストリームvalue= left と、右側のストリーム value =rightおよびkey=2を配置します。 ]。 結果は次のとおりです。

(key= 1) -> (left=left, right=null)
(key= 2) -> (left=null, right=right)

集計の例では、単語数アルゴリズムを計算しますが、キーとして各単語の最初の2文字を使用します。

KTable<String, Long> aggregated = input
  .groupBy((key, value) -> (value != null && value.length() > 0)
    ? value.substring(0, 2).toLowerCase() : "",
    Grouped.with(Serdes.String(), Serdes.String()))
  .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
    Materialized.with(Serdes.String(), Serdes.Long()));

次のエントリを使用します。

"one", "two", "three", "four", "five"

出力は次のとおりです。

Word: on -> 3
Word: tw -> 3
Word: th -> 5
Word: fo -> 4
Word: fi -> 4

5. 正確に一度の処理セマンティクス(EOS)

コンシューマーがメッセージを1回だけ読むようにする必要がある場合があります。 Kafkaは、 Transactional API を使用してEOSを実装するために、メッセージをトランザクションに含める機能を導入しました。 同じ機能がバージョン0.11.0以降のKafkaStreamsでカバーされています。

Kafka StreamsでEOSを構成するために、次のプロパティを含めます。

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
  StreamsConfig.EXACTLY_ONCE);

6. インタラクティブクエリ

インタラクティブクエリを使用すると、分散環境でのアプリケーションの状態を調べることができます。 これは、ローカルストアからだけでなく、複数のインスタンスのリモートストアからも情報を抽出する機能を意味します。 基本的に、すべてのストアを収集してグループ化し、アプリケーションの完全な状態を取得します。

インタラクティブクエリを使用した例を見てみましょう。 まず、処理トポロジ(この場合は単語数アルゴリズム)を定義します。

KStream<String, String> textLines = 
  builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

final KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

次に、計算されたすべての単語数の状態ストア(キー値)を作成します。

groupedByWord
  .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("WordCountsStore")
  .withValueSerde(Serdes.Long()));

次に、Key-Valueストアにクエリを実行できます。

ReadOnlyKeyValueStore<String, Long> keyValueStore =
  streams.store(StoreQueryParameters.fromNameAndType(
    "WordCountsStore", QueryableStoreTypes.keyValueStore()));

KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
    KeyValue<String, Long> next = range.next();
    System.out.println("count for " + next.key + ": " + next.value);
}

例の出力は次のとおりです。

Count for and: 1
Count for filter: 2
Count for sentence: 1
Count for test: 4
Count for this: 1

7. 結論

このチュートリアルでは、KafkaStreamsがKafkaトピックからメッセージを取得する際の処理操作を簡素化する方法を示しました。 これにより、Kafkaでストリームを処理する際の実装が大幅に容易になります。 ステートレス処理だけでなく、ステートフル変換にも使用できます。

もちろん、KafkaStreamsを使用せずにコンシューマーアプリケーションを完全に構築することは可能です。 ただし、無料で提供される多数の追加機能を手動で実装する必要があります。

いつものように、コードはGitHubから入手できます。