JavaでのKafkaStreamsの概要
1. 概要
この記事では、KafkaStreamsライブラリについて説明します。
KafkaStreams ApacheKafkaの作成者によって設計されています
KafkaStreams を使用すると、Kafkaトピックからデータを消費し、データを分析または変換し、場合によっては別のKafkaトピックに送信できます。
KafkaStreamsをデモンストレーションするために、トピックから文を読み取り、単語の出現回数をカウントし、単語ごとのカウントを出力する簡単なアプリケーションを作成します。
KafkaStreams ライブラリはリアクティブではなく、非同期操作とバックプレッシャ処理をサポートしていないことに注意してください。
2. Mavenの依存関係
KafkaStreamsを使用してストリーム処理ロジックの記述を開始するには、 kafka-streamsおよびkafka-clientsに依存関係を追加する必要があります。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
また、Kafkaトピックを使用するため、ApacheKafkaをインストールして起動する必要があります。 このトピックは、ストリーミングジョブのデータソースになります。
Kafkaおよびその他の必要な依存関係は、公式Webサイトからダウンロードできます。
3. KafkaStreams入力の構成
最初に行うことは、入力Kafkaトピックの定義です。
ダウンロードしたConfluentツールを使用できます。このツールにはKafkaサーバーが含まれています。 また、Kafkaにメッセージを公開するために使用できるkafka-console-producerも含まれています。
開始するには、Kafkaクラスターを実行してみましょう。
./confluent start
Kafkaが起動したら、 APPLICATION_ID_CONFIG を使用して、データソースとアプリケーションの名前を定義できます。
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-live-test");
重要な構成パラメーターは
private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
次に、 inputTopic:から消費されるメッセージのキーのタイプと値を渡す必要があります。
streamsConfiguration.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
このテストでは、ローカルファイルシステムを使用しています。
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());
4. ストリーミングトポロジの構築
入力トピックを定義したら、ストリーミングトポロジを作成できます。これは、イベントの処理方法と変換方法の定義です。
この例では、単語カウンターを実装したいと思います。 inputTopicに送信されるすべての文について、それを単語に分割し、すべての単語の出現を計算します。
KStreamsBuilder クラスのインスタンスを使用して、トポロジの構築を開始できます。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
単語数を実装するには、まず、正規表現を使用して値を分割する必要があります。
splitメソッドは配列を返しています。 flatMapValues()を使用してフラット化します。 そうしないと、配列のリストになってしまい、そのような構造を使用してコードを記述するのは不便になります。
最後に、すべての単語の値を集計し、特定の単語の出現を計算する count()を呼び出します。
5. 結果の処理
入力メッセージの単語数はすでに計算済みです。 次に、foreach()メソッドを使用して結果を標準出力に出力しましょう:
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
本番環境では、このようなストリーミングジョブが出力を別のKafkaトピックに公開することがよくあります。
to()メソッドを使用してこれを行うことができます:
String outputTopic = "outputTopic";
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
Serde クラスは、オブジェクトをバイト配列にシリアル化するために使用されるJavaタイプ用に事前構成されたシリアライザーを提供します。 次に、バイトの配列がKafkaトピックに送信されます。
トピックのキーとしてStringを使用し、実際のカウントの値としてLongを使用しています。 to()メソッドは、結果のデータをoutputTopicに保存します。
6. KafkaStreamジョブの開始
ここまでで、実行可能なトポロジを構築しました。 しかし、仕事はまだ始まっていません。
KafkaStreamsインスタンスでstart()メソッドを呼び出して、ジョブを明示的に開始する必要があります。
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
Thread.sleep(30000);
streams.close();
ジョブが終了するのを30秒待っていることに注意してください。 実際のシナリオでは、そのジョブは常に実行され、Kafkaからのイベントが到着すると処理されます。
Kafkaトピックにいくつかのイベントを公開することで、仕事をテストできます。
kafka -console-producer を起動し、いくつかのイベントを inputTopic:に手動で送信してみましょう。
./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"
このようにして、2つのイベントをKafkaに公開しました。 アプリケーションはこれらのイベントを消費し、次の出力を出力します。
word: -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word: -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2
最初のメッセージが到着したとき、ポニーという単語が1回だけ出現したことがわかります。 しかし、2番目のメッセージを送信すると、ポニーという単語が2回目の印刷で発生しました:「単語:ポニー->2」。
6. 結論
この記事では、データソースとしてApache Kafkaを使用し、ストリーム処理ライブラリとして KafkaStreams ライブラリを使用して、プライマリストリーム処理アプリケーションを作成する方法について説明します。
これらの例とコードスニペットはすべて、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。