Kafka、Spark Streaming、Cassandraを使用したデータパイプラインの構築

1. 概要

https://kafka.apache.org/[Apache Kafka]は、*メッセージングシステムのようなデータストリームの読み取りと書き込みを可能にする、スケーラブルで高性能、低遅延のプラットフォームです。 link:/spring-kafka[JavaでKafkaから始めることができます]かなり簡単です。
https://spark.apache.org/streaming/[Spark Streaming]は、* https://spark.apache.org/ [Apache Spark]プラットフォームの一部であり、*データストリームのスケーラブルで高スループット、フォールトトレラントな処理を可能にします* 。 Scalaで記述されていますが、https://www.baeldung.com/apache-spark [Sparkは動作するJava APIを提供しています]。
https://cassandra.apache.org/[Apache Cassandra]は、分散型で幅の広いNoSQLデータストア*です。 link:/cassandra-with-java[Cassandraの詳細]は、以前の記事で入手できます。
このチュートリアルでは、これらを組み合わせて、リアルタイムのデータストリーム用の高度にスケーラブルでフォールトトレラントなデータパイプラインを作成します。

2. インストール

開始するには、アプリケーションを実行するためにマシンにローカルにインストールされたKafka、Spark、Cassandraが必要です。 これらのプラットフォームを使用して、データパイプラインを開発する方法を説明します。
ただし、すべてのインストール用のポートを含むすべてのデフォルト構成はそのままにしておきます。これにより、チュートリアルをスムーズに実行できます。

2.1. カフカ

ローカルマシンにKafkaをインストールするのはかなり簡単で、https://kafka.apache.org/quickstart [公式ドキュメント]の一部として見つけることができます。 Kafkaの2.1.0リリースを使用します。
さらに、* Kafkaの実行にはhttps://zookeeper.apache.org/[Apache Zookeeper]が必要ですが、このチュートリアルでは、Kafkaにパッケージ化された単一ノードのZookeeperインスタンスを活用します。
公式ガイドに従ってZookeeperとKafkaをローカルで起動できたら、「メッセージ」という名前のトピックを作成できます。
 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic messages
上記のスクリプトはWindowsプラットフォーム用ですが、Unixライクなプラットフォームでも同様のスクリプトが利用可能です。

2.2. スパーク

Sparkは、HDFSおよびYARNにHadoopのクライアントライブラリを使用します。 その結果、これらすべての互換バージョンをアセンブルするのは非常に難しい場合があります*。 ただし、https://spark.apache.org/downloads.html [Sparkの公式ダウンロード]には、人気のあるバージョンのHadoopがあらかじめパッケージ化されています。 このチュートリアルでは、バージョン2.3.0パッケージ「Apache Hadoop 2.7以降用に事前にビルドされた」を使用します。
Sparkの適切なパッケージが解凍されると、利用可能なスクリプトを使用してアプリケーションを送信できます。 これは、後でSpring Bootでアプリケーションを開発するときに表示されます。

2.3. カサンドラ

DataStaxは、Windowsを含むさまざまなプラットフォーム向けにCassandraのコミュニティ版を提供しています。 これをローカルマシンに非常に簡単にダウンロードしてインストールできますhttps://academy.datastax.com/planet-cassandra//cassandra [公式ドキュメントに従って]。 バージョン3.9.0を使用します。
ローカルマシンにCassandraをインストールして起動したら、キースペースとテーブルの作成に進むことができます。 これは、インストールに同梱されているCQLシェルを使用して実行できます。
CREATE KEYSPACE vocabulary
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);
_vocabulary_と呼ばれる名前空間と、_words_と呼ばれる、_word_と_count_の2つの列を持つテーブルを作成したことに注意してください。

3. 依存関係

KafkaとSparkの依存関係をMavenを介してアプリケーションに統合できます。 これらの依存関係をMaven Centralから取得します。
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.11</artifactId>
    <version>1.5.2</version>
</dependency>
*これらの依存関係の一部は、スコープ内で_provided_としてマークされていることに注意してください。*これは、spark-submitを使用して実行するアプリケーションを送信するSparkインストールによって利用可能になるためです。

4. スパークストリーミング– Kafka統合戦略

この時点で、SparkとKafkaの統合戦略について簡単に説明する価値があります。
  • Kafkaは、バージョン0.8と0.10の間に新しいコンシューマAPIを導入しました。*したがって、対応するSpark Streamingパッケージは両方のブローカーバージョンで利用できます。 利用可能なブローカーと必要な機能に応じて適切なパッケージを選択することが重要です。

4.1。 スパークストリーミングカフカ0.8

0.8バージョンは、Receiverベースまたはダイレクトアプローチを使用するオプションを備えた安定した統合APIです。 https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html [公式ドキュメントで見つけることができる]これらのアプローチの詳細には触れません。 ここで注意すべき重要な点は、このパッケージがKafka Brokerバージョン0.8.2.1以上と互換性があることです。

4.2。 Spark Streaming Kafka 0.10

現在、これは実験的な状態であり、Kafka Brokerバージョン0.10.0以降とのみ互換性があります。 このパッケージは、直接アプローチのみを提供し、現在は新しいKafkaコンシューマAPI *を使用しています。 このhttps://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.htmlの詳細については、公式ドキュメントをご覧ください。 重要なのは、*古いKafka Brokerバージョンとの後方互換性がない*ことです。
このチュートリアルでは、0.10パッケージを使用することに注意してください。 前のセクションで言及した依存関係は、これのみを参照します。

5. データパイプラインの開発

Sparkを使用してJavaで簡単なアプリケーションを作成し、以前に作成したKafkaトピックと統合します。 アプリケーションは、投稿されたメッセージを読み取り、すべてのメッセージの単語の頻度をカウントします。 これは、前に作成したCassandraテーブルで更新されます。
データがどのように流れるかをすばやく視覚化しましょう。
link:/uploads/Simple-Data-Pipeline-1-100x20.jpg%20100w []
 

5.1. _ JavaStreamingContext _の取得

まず、すべてのSpark Streamingアプリケーションのエントリポイントである* _JavaStreamingContext_を初期化することから始めます*。
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

JavaStreamingContext streamingContext = new JavaStreamingContext(
  sparkConf, Durations.seconds(1));

5.2. Kafkaから_DStream_を取得する

これで、_JavaStreamingContext_からKafkaトピックに接続できます。
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");

JavaInputDStream<ConsumerRecord<String, String>> messages =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
ここでキーと値のデシリアライザーを提供することに注意してください。 * _String_などの一般的なデータ型の場合、デシリアライザーはデフォルトで使用可能です*。 ただし、カスタムデータ型を取得する場合は、カスタムデシリアライザーを提供する必要があります。
ここでは、Spark Streaming *によって提供される基本的な抽象化であるDiscreized Streamsまたは* DStreamsの実装である_JavaInputDStream_を取得しました。 内部的には、DStreamsは連続した一連のRDDにすぎません。

5.3. 取得した処理_DStream_

ここで、_JavaInputDStream_に対して一連の操作を実行して、メッセージ内の単語の頻度を取得します。
JavaPairDStream<String, String> results = messages
  .mapToPair(
      record -> new Tuple2<>(record.key(), record.value())
  );
JavaDStream<String> lines = results
  .map(
      tuple2 -> tuple2._2()
  );
JavaDStream<String> words = lines
  .flatMap(
      x -> Arrays.asList(x.split("\\s+")).iterator()
  );
JavaPairDStream<String, Integer> wordCounts = words
  .mapToPair(
      s -> new Tuple2<>(s, 1)
  ).reduceByKey(
      (i1, i2) -> i1 + i2
    );

5.4. Cassandraへの処理済み_DStream_の永続化

最後に、処理された_JavaPairDStream_を反復処理してCassandraテーブルに挿入できます。
wordCounts.foreachRDD(
    javaRdd -> {
      Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
      for (String key : wordCountMap.keySet()) {
        List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
        JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
        javaFunctions(rdd).writerBuilder(
          "vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
      }
    }
  );

5.5. アプリケーションを実行する

これはストリーム処理アプリケーションであるため、これを実行し続ける必要があります。
streamingContext.start();
streamingContext.awaitTermination();

6. チェックポイントの活用

ストリーム処理アプリケーションでは、*処理中のデータのバッチ間で状態を保持すると便利です*。
たとえば、以前の試みでは、現在の単語の頻度のみを保存できました。 代わりに累積頻度を保存する場合はどうなりますか? * Spark Streamingは、チェックポイントと呼ばれる概念を通じて可能にします。*
チェックポイントを活用するために、以前に作成したパイプラインを変更します。
link:/uploads/Data-Pipeline-With-Checkpoints-1-100x36.jpg%20100w []
データ処理のセッションにのみチェックポイントを使用することに注意してください。 これはフォールトトレランスを提供しません。 *ただし、チェックポイント設定はフォールトトレランスにも使用できます。*
チェックポイントを活用するには、アプリケーションにいくつかの変更を加える必要があります。 これには、_JavaStreamingContext_にチェックポイントの場所を提供することが含まれます。
streamingContext.checkpoint("./.checkpoint");
ここでは、ローカルファイルシステムを使用してチェックポイントを保存しています。 ただし、堅牢性のために、これはHDFS、S3、Kafkaなどの場所に保存する必要があります。 詳細については、https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#checkpointing [公式ドキュメント]で入手できます。
次に、マッピング関数を使用してすべてのパーティションを処理しながら、チェックポイントをフェッチし、単語の累積カウントを作成する必要があります。
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
  .mapWithState(
    StateSpec.function(
        (word, one, state) -> {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      )
    );
累積単語数を取得したら、繰り返して、以前のようにCassandraに保存できます。
*データのチェックポイント設定はステートフル処理には便利ですが、レイテンシコストが発生します*。 したがって、最適なチェックポイント間隔とともにこれを賢く使用する必要があります。

7. オフセットについて

前に設定したKafkaパラメーターの一部を思い出すと:
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
これらは基本的に*オフセットの自動コミットを行いたくないため、コンシューマグループが初期化されるたびに最新のオフセットを選択することを意味します*。 したがって、アプリケーションは、実行中に投稿されたメッセージのみを消費できます。
アプリケーションが実行されているかどうかに関係なく、投稿されたすべてのメッセージを消費し、既に投稿されたメッセージを追跡する場合は、**オフセット状態を保存するとともにオフセットを適切に構成する必要があります**これは、このチュートリアルの範囲外です。
*これは、Spark Streamingが「exactly once」などの特定のレベルの保証を提供する方法でもあります。*これは、基本的に、Kafkaトピックに投稿された各メッセージがSpark Streamingによって1回だけ処理されることを意味します。

8. アプリケーションの展開

SparkのインストールにプリパックされているSpark-submitスクリプトを使用して*アプリケーションをデプロイできます。
$SPARK_HOME$\bin\spark-submit \
  --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
  --master local[2]
  \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Mavenを使用して作成するjarには、スコープ内で_provided_としてマークされていない依存関係が含まれている必要があることに注意してください。
このアプリケーションを送信し、前に作成したKafkaトピックにメッセージを投稿すると、前に作成したCassandraテーブルに累積単語数が投稿されていることがわかります。

9. 結論

まとめると、このチュートリアルでは、Kafka、Spark Streaming、Cassandraを使用して簡単なデータパイプラインを作成する方法を学びました。 また、Spark Streamingのチェックポイントを活用してバッチ間の状態を維持する方法も学びました。
いつものように、例のコードは入手可能ですhttps://github.com/eugenp/tutorials/tree/master/apache-spark[over on GitHub]。