1. 概要

このチュートリアルでは、Apache Beamを紹介し、その基本的な概念を探ります。

まず、Apache Beamを使用するユースケースと利点を示し、次に基本的な概念と用語について説明します。 その後、ApacheBeamのすべての重要な側面を説明する簡単な例を見ていきます。

2. Apache Beamとは何ですか?

Apache Beam(Batch + strEAM)は、バッチおよびストリーミングデータ処理ジョブの統合プログラミングモデルです。データ処理パイプラインとそれらを実行するランナーを定義および構築するためのソフトウェア開発キットを提供します。

Apache Beamは、ポータブルプログラミングレイヤーを提供するように設計されています。実際、Beam Pipeline Runnerは、データ処理パイプラインをユーザーが選択したバックエンドと互換性のあるAPIに変換します。 現在、これらの分散処理バックエンドがサポートされています。

  • Apache Apex
  • Apache Flink
  • Apache Gearpump(インキュベーション)
  • Apache Samza
  • Apache Spark
  • Google Cloud Dataflow
  • ヘーゼルキャストジェット

3. なぜApacheBeamなのか?

Apache Beamは、バッチデータ処理とストリーミングデータ処理を融合しますが、他の多くの場合、別々のAPIを介して融合します。したがって、要件の変化に応じて、ストリーミングプロセスをバッチプロセスに、またはその逆に変更するのは非常に簡単です。

Apache Beamは、移植性と柔軟性を向上させます。基本的な詳細ではなく、ロジックに重点を置いています。 さらに、データ処理バックエンドはいつでも変更できます。

Apache Beamで利用できるJava、Python、Go、およびScalaSDKがあります。 実際、チームの全員が選択した言語でそれを使用できます。

4. 基本的な概念

Apache Beamを使用すると、ワークフローグラフ(パイプライン)を作成して実行できます。 プログラミングモデルの重要な概念は次のとおりです。

  • PCollection –固定バッチまたはデータストリームのデータセットを表します
  • PTransform –1つ以上のPCollection を取り、0個以上のPCollectionを出力するデータ処理操作
  • Pipeline PCollectionおよびPTransformの有向非巡回グラフを表すため、データ処理ジョブ全体をカプセル化します
  • PipelineRunner –指定された分散処理バックエンドでPipelineを実行します

簡単に言うと、 PipelineRunnerPipeline、を実行し、PipelinePCollectionPTransformで構成されます。

5. 単語数の例

Apache Beamの基本的な概念を学習したので、単語数のタスクを設計してテストしましょう。

5.1. ビームパイプラインの構築

ワークフローグラフの設計は、すべてのApacheBeamジョブの最初のステップです。 単語数タスクのステップを定義しましょう:

  1. ソースからテキストを読みます。
  2. テキストを単語のリストに分割します。
  3. すべての単語を小文字にします。
  4. 句読点を削除します。
  5. ストップワードをフィルタリングします。
  6. それぞれのユニークな単語を数えます。

これを実現するには、PCollectionおよびPTransformの抽象化を使用して、上記の手順を単一のPipelineに変換する必要があります。

5.2. 依存関係

ワークフローグラフを実装する前に、ApacheBeamのコア依存関係をプロジェクトに追加する必要があります。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
</dependency>

ビームパイプラインランナーは、タスクを実行するために分散処理バックエンドに依存しています。 DirectRunnerを実行時の依存関係として追加しましょう。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
</dependency>

他のパイプラインランナーとは異なり、 DirectRunner は追加のセットアップを必要としないため、初心者に適しています。

5.3. 実装

Apache Beamは、Map-Reduceプログラミングパラダイム( Java Streams と同じ)を利用します。 実際、 reduce() filter() count() map()、続行する前に、flatMap()

Pipeline を作成することは、私たちが最初に行うことです。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

次に、6ステップの単語カウントタスクを適用します。

PCollection<KV<String, Long>> wordCount = p
    .apply("(1) Read all lines", 
      TextIO.read().from(inputFilePath))
    .apply("(2) Flatmap to a list of words", 
      FlatMapElements.into(TypeDescriptors.strings())
      .via(line -> Arrays.asList(line.split("\\s"))))
    .apply("(3) Lowercase all", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> word.toLowerCase()))
    .apply("(4) Trim punctuations", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> trim(word)))
    .apply("(5) Filter stopwords", 
      Filter.by(word -> !isStopWord(word)))
    .apply("(6) Count words", 
      Count.perElement());

apply()の最初の(オプションの)引数は String であり、コードを読みやすくするためだけのものです。 上記のコードで各apply()が行うことは次のとおりです。

  1. まず、TextIOを使用して入力テキストファイルを1行ずつ読み取ります。
  2. 各行を空白で分割し、単語のリストにフラットマップします。
  3. 単語数では大文字と小文字が区別されないため、すべての単語を小文字にします。
  4. 以前は、行を空白で分割し、「word!」のような単語で終わりました。 と「単語?」なので、句読点を削除します。
  5. 「is」や「by」などのストップワードは、ほとんどすべての英語のテキストで頻繁に使用されるため、削除します。
  6. 最後に、組み込み関数 Count.perElement()を使用して一意の単語をカウントします。

前述のように、パイプラインは分散バックエンドで処理されます。 PCollection は複数のバックエンドに分散されているため、メモリ内で反復処理することはできません。 代わりに、結果を外部データベースまたはファイルに書き込みます。

まず、PCollectionStringに変換します。 次に、TextIOを使用して出力を書き込みます。

wordCount.apply(MapElements.into(TypeDescriptors.strings())
    .via(count -> count.getKey() + " --> " + count.getValue()))
    .apply(TextIO.write().to(outputFilePath));

Pipeline の定義が完了したので、実行してテストできます。

5.4. 実行とテスト

これまで、単語数タスク用にPipelineを定義してきました。 この時点で、Pipelineを実行してみましょう。

p.run().waitUntilFinish();

このコード行で、ApacheBeamはタスクを複数のDirectRunnerインスタンスに送信します。 その結果、最後にいくつかの出力ファイルが生成されます。 次のようなものが含まれます。

...
apache --> 3
beam --> 5
rocks --> 2
...

Apache Beamでの分散ジョブの定義と実行は、これと同じくらい簡単で表現力豊かです。 比較のために、単語数の実装は、 Apache Spark Apache Flink 、および HazelcastJetでも利用できます。

6. ここからどこにいきますか?

入力ファイルから各単語を正常にカウントしましたが、最も頻繁に使用される単語のレポートはまだありません。 確かに、 PCollection の並べ替えは、次のステップとして解決するのに適した問題です。

後で、ウィンドウ処理、トリガー、メトリック、およびより高度な変換について詳しく知ることができます。 Apache Beamドキュメントは、詳細な情報と参考資料を提供します。

7. 結論

このチュートリアルでは、Apache Beamとは何か、そしてApacheBeamが他の方法よりも好まれる理由を学びました。 また、単語数の例を使用して、ApacheBeamの基本的な概念を示しました。

このチュートリアルのコードは、GitHubから入手できます。