1概要

この記事では、https://kafka.apache.org/documentation/streams/[

KafkaStreams

library]を調べます。


KafkaStreams

は、Apache Kafkaの作成者によって設計されたものです。このソフトウェアの主な目的は、プログラマがマイクロサービスとして機能する可能性のある効率的なリアルタイムストリーミングアプリケーションを作成できるようにすることです。


  • KafkaStreams

    を使用すると、Kafkaトピックからデータを分析したり、データを分析または変換したり、場合によっては別のKafkaトピックに送信したりできます。

__KafkaStreamsを説明するために、トピックから文を読み取り、単語の出現回数をカウントし、単語ごとのカウントを印刷する簡単なアプリケーションを作成します。

重要なのは、

KafkaStreams

ライブラリは反応的ではなく、非同期操作やバックプレッシャー処理をサポートしていないことです。


2 Mavenの依存関係


KafkaStreamsを使用してストリーム処理ロジックの作成を開始するには、


https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.apache.kafka%22%20AND%に依存関係を追加する必要があります。

20a%3A%22kafka-streams%22[

kafka-streams

]およびhttps://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.apache.kafka%22%20AND%20a% 3A%22kafka-clients%22[

kafka-clients

]:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

Kafkaのトピックを使用するので、Apache Kafkaもインストールして起動する必要があります。このトピックは、ストリーミングジョブのデータソースになります。

Kafkaやその他の必要な依存関係はhttps://www.confluent.io/download/[the official website]からダウンロードできます。


3 KafkaStreams入力の設定

最初にすることは、入力Kafkaトピックの定義です。

ダウンロードした

Confluent

ツールを使用できます。これにはKafkaサーバーが含まれています。 Kafkaにメッセージをパブリッシュするために使用できる

kafka-console-producer

も含まれています。

はじめに、Kafkaクラスタを実行しましょう。

./confluent start

Kafkaが起動したら、

APPLICATION

ID

CONFIG

を使用してデータソースとアプリケーションの名前を定義できます。

String inputTopic = "inputTopic";

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
  StreamsConfig.APPLICATION__ID__CONFIG,
  "wordcount-live-test");

重要な設定パラメータは

BOOTSTRAP

SERVER__CONFIGです。

private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
  StreamsConfig.BOOTSTRAP__SERVERS__CONFIG,
  bootstrapServers);

次に、__inputTopicから消費されるメッセージのキーの種類と値を渡します。

streamsConfiguration.put(
  StreamsConfig.DEFAULT__KEY__SERDE__CLASS__CONFIG,
  Serdes.String().getClass().getName());
streamsConfiguration.put(
  StreamsConfig.DEFAULT__VALUE__SERDE__CLASS__CONFIG,
  Serdes.String().getClass().getName());

  • ストリーム処理はしばしばステートフルです。中間結果を保存したいときは、

    STATE

    DIR

    CONFIG

    パラメータ** を指定する必要があります。

今回のテストでは、ローカルファイルシステムを使用しています。

streamsConfiguration.put(
  StreamsConfig.STATE__DIR__CONFIG,
  TestUtils.tempDirectory().getAbsolutePath());


4ストリーミングトポロジの構築

入力トピックを定義したら、ストリーミングトポロジ(イベントの処理方法と変換方法を定義したもの)を作成できます。

この例では、単語カウンタを実装します。

inputTopicに送信されたすべての文に対して、

を単語に分割し、すべての単語の出現回数を計算します。

トポロジの構築を開始するために

KStreamsBuilder

クラスのインスタンスを使用できます。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE__CHARACTER__CLASS);

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, word) -> word)
  .count();

単語数を実装するには、まず正規表現を使用して値を分割する必要があります。

splitメソッドは配列を返しています。平坦化するために

flatMapValues()

を使用しています。そうでなければ、配列のリストになってしまい、そのような構造を使ってコードを書くのは不便です。

最後に、各単語の値を集計し、特定の単語の出現回数を計算する

count()

を呼び出します。


5処理結果

入力メッセージの単語数はすでに計算済みです。

それでは、

foreach()

メソッドを使って結果を標準出力に出力しましょう:

wordCounts
  .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

プロダクションでは、そのようなストリーミングジョブはしばしば他のKafkaトピックに出力を公開するかもしれません。

__to()メソッドを使用してこれを実行できます。

String outputTopic = "outputTopic";
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);


Serde

クラスは、オブジェクトをバイト配列にシリアル化するために使用されるJava型用の事前設定済みのシリアライザを提供します。バイトの配列はKafkaトピックに送信されます。

トピックのキーとして

String

を使用し、実際の数の値として

Long

を使用します。

to()

メソッドは、結果のデータを

outputTopic

に保存します。


6. KafkaStreamジョブの開始

ここまでで、実行可能なトポロジを構築しました。しかし、仕事はまだ始まっていません。


  • KafkaStreams

    インスタンスの

    start()

    メソッドを呼び出して、明示的にジョブを開始する必要があります。

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

Thread.sleep(30000);
streams.close();

仕事が終わるのを30秒間待っていることに注意してください。現実のシナリオでは、そのジョブは常に実行され、Kafkaからのイベントが到着したときに処理されます。

  • Kafkaのトピックにイベントを公開することで、仕事をテストできます。


kafka-console-producer

を起動し、手動で

inputTopicにイベントを送信しましょう:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"

このようにして、Kafkaに2つのイベントを公開しました。私たちのアプリケーションはそれらのイベントを消費し、次の出力を出力します。

word:  -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word:  -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2

最初のメッセージが到着したとき、単語

pony

は一度だけ現れたことがわかります。しかし、2番目のメッセージを送信したとき、2回目の印刷では

pony

という単語が発生しました。「

word:pony – > 2 ”


6. 結論

この記事では、Apache Kafkaをデータ・ソースとして使用し、

KafkaStreams

ライブラリーをストリーム処理ライブラリーとして使用して、1次ストリーム処理アプリケーションを作成する方法について説明します。

これらの例とコードスニペットはすべてhttps://github.com/eugenp/tutorials/tree/master/libraries-data[GitHubプロジェクト]にあります。これはMavenプロジェクトなので、インポートおよび実行が簡単です。そのまま。