1. 概要

この記事では、KafkaStreamsライブラリについて説明します。

KafkaStreams ApacheKafkaの作成者によって設計されていますこのソフトウェアの主な目標は、プログラマーがマイクロサービスとして機能する効率的なリアルタイムのストリーミングアプリケーションを作成できるようにすることです。

KafkaStreams を使用すると、Kafkaトピックからデータを消費し、データを分析または変換し、場合によっては別のKafkaトピックに送信できます。

KafkaStreamsをデモンストレーションするために、トピックから文を読み取り、単語の出現回数をカウントし、単語ごとのカウントを出力する簡単なアプリケーションを作成します。

KafkaStreams ライブラリはリアクティブではなく、非同期操作とバックプレッシャ処理をサポートしていないことに注意してください。

2. Mavenの依存関係

KafkaStreamsを使用してストリーム処理ロジックの記述を開始するには、 kafka-streamsおよびkafka-clientsに依存関係を追加する必要があります。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

また、Kafkaトピックを使用するため、ApacheKafkaをインストールして起動する必要があります。 このトピックは、ストリーミングジョブのデータソースになります。

Kafkaおよびその他の必要な依存関係は、公式Webサイトからダウンロードできます。

3. KafkaStreams入力の構成

最初に行うことは、入力Kafkaトピックの定義です。

ダウンロードしたConfluentツールを使用できます。このツールにはKafkaサーバーが含まれています。 また、Kafkaにメッセージを公開するために使用できるkafka-console-producerも含まれています。

開始するには、Kafkaクラスターを実行してみましょう。

./confluent start

Kafkaが起動したら、 APPLICATION_ID_CONFIG を使用して、データソースとアプリケーションの名前を定義できます。

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
  StreamsConfig.APPLICATION_ID_CONFIG, 
  "wordcount-live-test");

重要な構成パラメーターは BOOTSTRAP_SERVER_CONFIG。 これは、開始したばかりのローカルKafkaインスタンスへのURLです。

private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
  StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
  bootstrapServers);

次に、 inputTopic:から消費されるメッセージのキーのタイプと値を渡す必要があります。

streamsConfiguration.put(
  StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());
streamsConfiguration.put(
  StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
  Serdes.String().getClass().getName());

多くの場合、ストリーム処理はステートフルです。 中間結果を保存する場合は、STATE_DIR_CONFIGパラメーターを指定する必要があります。

このテストでは、ローカルファイルシステムを使用しています。

this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
  StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());

4. ストリーミングトポロジの構築

入力トピックを定義したら、ストリーミングトポロジを作成できます。これは、イベントの処理方法と変換方法の定義です。

この例では、単語カウンターを実装したいと思います。 inputTopicに送信されるすべての文について、それを単語に分割し、すべての単語の出現を計算します。

KStreamsBuilder クラスのインスタンスを使用して、トポロジの構築を開始できます。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, word) -> word)
  .count();

単語数を実装するには、まず、正規表現を使用して値を分割する必要があります。

splitメソッドは配列を返しています。 flatMapValues()を使用してフラット化します。 そうしないと、配列のリストになってしまい、そのような構造を使用してコードを記述するのは不便になります。

最後に、すべての単語の値を集計し、特定の単語の出現を計算する count()を呼び出します。

5. 結果の処理

入力メッセージの単語数はすでに計算済みです。 次に、foreach()メソッドを使用して結果を標準出力に出力しましょう:

wordCounts.toStream()
  .foreach((word, count) -> System.out.println("word: " + word + " -> " + count));

本番環境では、このようなストリーミングジョブが出力を別のKafkaトピックに公開することがよくあります。

to()メソッドを使用してこれを行うことができます:

String outputTopic = "outputTopic";
wordCounts.toStream()
  .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

Serde クラスは、オブジェクトをバイト配列にシリアル化するために使用されるJavaタイプ用に事前構成されたシリアライザーを提供します。 次に、バイトの配列がKafkaトピックに送信されます。

トピックのキーとしてStringを使用し、実際のカウントの値としてLongを使用しています。 to()メソッドは、結果のデータをoutputTopicに保存します。

6. KafkaStreamジョブの開始

ここまでで、実行可能なトポロジを構築しました。 しかし、仕事はまだ始まっていません。

KafkaStreamsインスタンスでstart()メソッドを呼び出して、ジョブを明示的に開始する必要があります。

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();

Thread.sleep(30000);
streams.close();

ジョブが終了するのを30秒待っていることに注意してください。 実際のシナリオでは、そのジョブは常に実行され、Kafkaからのイベントが到着すると処理されます。

Kafkaトピックにいくつかのイベントを公開することで、仕事をテストできます。

kafka -console-producer を起動し、いくつかのイベントを inputTopic:に手動で送信してみましょう。

./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"

このようにして、2つのイベントをKafkaに公開しました。 アプリケーションはこれらのイベントを消費し、次の出力を出力します。

word:  -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word:  -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2

最初のメッセージが到着したとき、ポニーという単語が1回だけ出現したことがわかります。 しかし、2番目のメッセージを送信すると、ポニーという単語が2回目の印刷で発生しました:「単語:ポニー->2」

6. 結論

この記事では、データソースとしてApache Kafkaを使用し、ストリーム処理ライブラリとして KafkaStreams ライブラリを使用して、プライマリストリーム処理アプリケーションを作成する方法について説明します。

これらの例とコードスニペットはすべて、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。