1. 概要

Apache Kafka は、スケーラブルで高性能、低遅延のプラットフォームであり、はメッセージングシステムのようにデータのストリームの読み取りと書き込みを可能にします。 JavaのKafkaからかなり簡単に始めることができます。

Spark Streaming は、 Apache Spark プラットフォームの一部であり、は、データストリームのスケーラブルで高スループットのフォールトトレラント処理を可能にします。 Scalaで書かれていますが、Sparkはで動作するJavaAPIを提供します。

Apache Cassandra は、分散型のワイドカラムNoSQLデータストアです。 Cassandra の詳細については、前回の記事をご覧ください。

このチュートリアルでは、これらを組み合わせて、リアルタイムデータストリーム用の高度にスケーラブルでフォールトトレラントなデータパイプラインを作成します。

2. インストール

まず、アプリケーションを実行するために、Kafka、Spark、およびCassandraをマシンにローカルにインストールする必要があります。 これらのプラットフォームを使用してデータパイプラインを開発する方法を見ていきます。

ただし、チュートリアルをスムーズに実行するのに役立つすべてのインストールのポートを含むすべてのデフォルト構成を残します。

2.1. カフカ

ローカルマシンへのKafkaのインストールは非常に簡単で、公式ドキュメントの一部として見つけることができます。 Kafkaの2.1.0リリースを使用します。

さらに、KafkaではApacheZookeeperを実行する必要がありますが、このチュートリアルでは、Kafkaにパッケージ化された単一ノードのZookeeperインスタンスを利用します。

公式ガイドに従ってZookeeperとKafkaをローカルで起動できたら、「メッセージ」という名前のトピックの作成に進むことができます。

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic messages

上記のスクリプトはWindowsプラットフォーム用ですが、Unixライクなプラットフォームでも同様のスクリプトを使用できることに注意してください。

2.2. スパーク

Sparkは、HDFSとYARNにHadoopのクライアントライブラリを使用します。 したがって、これらすべての互換性のあるバージョンを組み立てるのは非常に難しい場合があります。 ただし、 Spark の公式ダウンロードには、人気のあるバージョンのHadoopがあらかじめパッケージ化されています。 このチュートリアルでは、「ApacheHadoop2.7以降用にビルド済み」のバージョン2.3.0パッケージを使用します。

Sparkの適切なパッケージが解凍されると、利用可能なスクリプトを使用してアプリケーションを送信できます。 これは、後でSpring Bootでアプリケーションを開発するときにわかります。

2.3. カサンドラ

DataStaxは、Windowsを含むさまざまなプラットフォーム用のCassandraのコミュニティエディションを利用できるようにします。 これは、公式ドキュメントに従って、ローカルマシンに非常に簡単にダウンロードしてインストールできます。 バージョン3.9.0を使用します。

ローカルマシンにCassandraをインストールして起動できたら、キースペースとテーブルの作成に進むことができます。 これは、インストールに付属しているCQLシェルを使用して実行できます。

CREATE KEYSPACE vocabulary
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);

vocabulary という名前空間と、wordsという2つの列を持つテーブルwordcountを作成したことに注意してください。

3. 依存関係

Mavenを介してKafkaとSparkの依存関係をアプリケーションに統合できます。 これらの依存関係をMavenCentralから取得します。

それに応じて、それらをpomに追加できます。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.11</artifactId>
    <version>1.5.2</version>
</dependency>

これらの依存関係の一部はスコープで提供されるものとしてマークされていることに注意してください。これは、spark-submitを使用して実行するアプリケーションを送信するSparkインストールによってこれらが利用可能になるためです。

4. Spark Streaming –Kafka統合戦略

この時点で、SparkとKafkaの統合戦略について簡単に説明する価値があります。

Kafkaはバージョン0.8と0.10の間に新しいコンシューマーAPIを導入しました。したがって、対応するSparkStreamingパッケージは両方のブローカーバージョンで利用できます。 利用可能なブローカーと必要な機能に応じて、適切なパッケージを選択することが重要です。

4.1. Spark Streaming Kafka 0.8

0.8バージョンは、ReceiverベースまたはDirectApproachを使用するオプションを備えた安定した統合APIです。 公式ドキュメントにあるこれらのアプローチの詳細については説明しません。 ここで注意すべき重要な点は、このパッケージがKafkaBrokerバージョン0.8.2.1以降と互換性があることです。

4.2. Spark Streaming Kafka 0.10

これは現在実験的な状態であり、KafkaBrokerバージョン0.10.0以降とのみ互換性があります。  このパッケージは、直接アプローチのみを提供し、新しいKafkaコンシューマーAPIを利用しています。 このの詳細については、公式ドキュメントを参照してください。 重要なのは、古いKafkaBrokerバージョンとの下位互換性がないことです。

このチュートリアルでは、0.10パッケージを使用することに注意してください。 前のセクションで説明した依存関係は、これのみを参照しています。

5. データパイプラインの開発

以前に作成したKafkaトピックと統合するSparkを使用して、Javaで簡単なアプリケーションを作成します。 アプリケーションは投稿されたメッセージを読み取り、すべてのメッセージの単語の頻度をカウントします。 これは、前に作成したCassandraテーブルで更新されます。

データがどのように流れるかをすばやく視覚化しましょう。

 

5.1. JavaStreamingContextの取得

まず、すべてのSparkStreamingアプリケーションのエントリポイントであるJavaStreamingContextを初期化することから始めます。

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

JavaStreamingContext streamingContext = new JavaStreamingContext(
  sparkConf, Durations.seconds(1));

5.2. KafkaからDStreamを取得しています

これで、JavaStreamingContextからKafkaトピックに接続できます。

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");

JavaInputDStream<ConsumerRecord<String, String>> messages = 
  KafkaUtils.createDirectStream(
    streamingContext, 
    LocationStrategies.PreferConsistent(), 
    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

ここでは、キーと値のデシリアライザーを提供する必要があることに注意してください。 Stringなどの一般的なデータ型の場合、デフォルトでデシリアライザーを使用できます。 ただし、カスタムデータ型を取得する場合は、カスタムデシリアライザーを提供する必要があります。

ここでは、SparkStreamingによって提供される基本的な抽象化であるDiscretizedStreamsまたはDStreamsの実装であるJavaInputDStreamを取得しました。 内部的には、DStreamsはRDDの連続シリーズに他なりません。

5.3. 取得した処理DStream

次に、 JavaInputDStream で一連の操作を実行して、メッセージ内の単語の頻度を取得します。

JavaPairDStream<String, String> results = messages
  .mapToPair( 
      record -> new Tuple2<>(record.key(), record.value())
  );
JavaDStream<String> lines = results
  .map(
      tuple2 -> tuple2._2()
  );
JavaDStream<String> words = lines
  .flatMap(
      x -> Arrays.asList(x.split("\\s+")).iterator()
  );
JavaPairDStream<String, Integer> wordCounts = words
  .mapToPair(
      s -> new Tuple2<>(s, 1)
  ).reduceByKey(
      (i1, i2) -> i1 + i2
    );

5.4. 処理されたDStreamをCassandraに永続化する

最後に、処理された JavaPairDStream を反復処理して、Cassandraテーブルに挿入できます。

wordCounts.foreachRDD(
    javaRdd -> {
      Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
      for (String key : wordCountMap.keySet()) {
        List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
        JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
        javaFunctions(rdd).writerBuilder(
          "vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
      }
    }
  );

5.5. アプリケーションの実行

これはストリーム処理アプリケーションであるため、これを実行し続けたいと思います。

streamingContext.start();
streamingContext.awaitTermination();

6. チェックポイントの活用

ストリーム処理アプリケーションでは、処理中のデータのバッチ間で状態を保持すると便利なことがよくあります

たとえば、以前の試みでは、単語の現在の頻度しか保存できません。 代わりに累積度数を保存したい場合はどうなりますか? スパークストリーミングは、チェックポイントと呼ばれる概念を通じてそれを可能にします。

ここで、チェックポイントを活用するために、以前に作成したパイプラインを変更します。

チェックポイントは、データ処理のセッションにのみ使用されることに注意してください。 これはフォールトトレランスを提供しません。 ただし、チェックポインティングはフォールトトレランスにも使用できます。

チェックポイントを活用するには、アプリケーションにいくつかの変更を加える必要があります。 これには、JavaStreamingContextにチェックポイントの場所を提供することが含まれます。

streamingContext.checkpoint("./.checkpoint");

ここでは、ローカルファイルシステムを使用してチェックポイントを保存しています。 ただし、堅牢性を確保するために、これはHDFS、S3、Kafkaなどの場所に保存する必要があります。 詳細については、公式ドキュメントをご覧ください。

次に、マッピング関数を使用してすべてのパーティションを処理しながら、チェックポイントをフェッチして累積単語数を作成する必要があります。

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
  .mapWithState(
    StateSpec.function( 
        (word, one, state) -> {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      )
    );

累積単語数を取得したら、前と同じように繰り返してCassandraに保存できます。

データチェックポインティングはステートフル処理に役立ちますが、レイテンシコストが伴うことに注意してください。 したがって、これを最適なチェックポイント間隔とともに賢く使用する必要があります。

7. オフセットを理解する

以前に設定したKafkaパラメータのいくつかを思い出すと:

kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

これらは基本的に、オフセットの自動コミットを望まず、コンシューマーグループが初期化されるたびに最新のオフセットを選択することを意味します。 したがって、アプリケーションは、実行中の期間中に投稿されたメッセージのみを使用できます。

アプリケーションが実行されているかどうかに関係なく投稿されたすべてのメッセージを消費し、すでに投稿されたメッセージを追跡したい場合は、オフセット状態を保存するとともにオフセットを適切に構成する必要がありますこれはこのチュートリアルの範囲から少し外れていますが。

これは、SparkStreamingが「正確に1回」などの特定のレベルの保証を提供する方法でもあります。これは基本的に、Kafkaトピックに投稿された各メッセージがSparkStreamingによって1回だけ処理されることを意味します。

8. アプリケーションのデプロイ

SparkインストールにあらかじめパックされているSpark送信スクリプトを使用して、アプリケーションをデプロイできます。

$SPARK_HOME$\bin\spark-submit \
  --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
  --master local[2] 
  \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Mavenを使用して作成するjarには、スコープ内で提供としてマークされていない依存関係が含まれている必要があることに注意してください。

このアプリケーションを送信し、前に作成したKafkaトピックにいくつかのメッセージを投稿すると、前に作成したCassandraテーブルに投稿された累積単語数が表示されます。

9. 結論

要約すると、このチュートリアルでは、Kafka、Spark Streaming、Cassandraを使用して単純なデータパイプラインを作成する方法を学びました。 また、Spark Streamingのチェックポイントを活用して、バッチ間の状態を維持する方法も学びました。

いつものように、例のコードはGitHubから入手できます。