1. 概要

Apache Flinkは、プログラマーが非常に効率的かつスケーラブルな方法で大量のデータを処理できるようにするビッグデータ処理フレームワークです。

この記事では、Apache Flink JavaAPIで利用可能なコアAPIの概念と標準データ変換のいくつかを紹介します。 このAPIの流暢なスタイルにより、Flinkの中心的な構成要素である分散コレクションを簡単に操作できます。

まず、Flinkの DataSet API変換を見て、それらを使用して単語数プログラムを実装します。 次に、Flinkの DataStream APIについて簡単に説明します。これにより、イベントのストリームをリアルタイムで処理できます。

2. Mavenの依存関係

開始するには、Mavenの依存関係を flink-javaおよびflink-test-utilsライブラリに追加する必要があります。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils_2.10</artifactId>
    <version>1.2.0</version>
    <scope>test<scope>
</dependency>

3. コアAPIの概念

Flinkを使用する場合、APIに関連するいくつかのことを知っておく必要があります。

  • すべてのFlinkプログラムは、分散されたデータのコレクションに対して変換を実行します。フィルタリング、マッピング、結合、グループ化、集約など、データを変換するためのさまざまな機能が提供されます。
  • Flinkでのシンク操作により、ストリームの実行がトリガーされ、結果をファイルシステムに保存したり、標準出力に出力したりするなど、プログラムの目的の結果が生成されます
  • フリンク変換は遅延です。つまり、シンク操作が呼び出されるまで実行されません。
  • Apache Flink APIは、バッチとリアルタイムの2つの操作モードをサポートしています。 バッチモードで処理できる限られたデータソースを扱っている場合は、 DataSet API。 無制限のデータストリームをリアルタイムで処理する場合は、 DataStreamAPIを使用する必要があります

4. DataSetAPI変換

Flinkプログラムへのエントリポイントは、 ExecutionEnvironment クラスのインスタンスです。これは、プログラムが実行されるコンテキストを定義します。

ExecutionEnvironment を作成して、処理を開始しましょう。

ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();

ローカルマシンでアプリケーションを起動すると、ローカルJVMで処理が実行されることに注意してください。 マシンのクラスターで処理を開始する場合は、インストールする必要があります Apache Flink それらのマシンで、 実行環境によると。

4.1. データセットの作成

データ変換の実行を開始するには、プログラムにデータを提供する必要があります。

ExecutionEnvironement を使用して、DataSetクラスのインスタンスを作成しましょう。

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

DataSet は、Apache Kafka、CSV、ファイル、または事実上他の任意のデータソースなどの複数のソースから作成できます。

4.2. フィルタリングして削減

DataSet クラスのインスタンスを作成したら、それに変換を適用できます。

特定のしきい値を超える数値をフィルタリングして、次にそれらをすべて合計するとします。あなたは使用することができますフィルター()減らす() これを達成するための変換:

int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();

assertThat(collect.get(0)).isEqualTo(90);

collect()メソッドは、実際のデータ変換をトリガーするシンク操作であることに注意してください。

4.3. 地図

PersonオブジェクトのDataSetがあるとします。

private static class Person {
    private int age;
    private String name;

    // standard constructors/getters/setters
}

次に、これらのオブジェクトのDataSetを作成しましょう。

DataSet<Person> personDataSource = env.fromCollection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));

コレクションのすべてのオブジェクトからageフィールドのみを抽出するとします。 map()変換を使用して、Personクラスの特定のフィールドのみを取得できます。

List<Integer> ages = personDataSource
  .map(p -> p.age)
  .collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

4.4. 加入

2つのデータセットがある場合は、それらをidフィールドに結合することをお勧めします。 このために、 join()変換を使用できます。

ユーザーのトランザクションとアドレスのコレクションを作成しましょう。

Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

両方のタプルの最初のフィールドはIntegerタイプであり、これは両方のデータセットを結合するidフィールドです。

実際の結合ロジックを実行するには、アドレスとトランザクション用のKeySelectorインターフェイスを実装する必要があります。

private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    @Override
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;
    }
}

private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    @Override
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;
    }
}

各セレクターは、結合を実行する必要があるフィールドのみを返します。

残念ながら、Flinkにはジェネリック型の情報が必要なため、ここでラムダ式を使用することはできません。

次に、これらのセレクターを使用してマージロジックを実装しましょう。

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5. 選別

次のTuple2のコレクションがあるとします。

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

このコレクションをタプルの最初のフィールドでソートする場合は、 sortPartitions()変換を使用できます。

List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();

assertThat(sorted)
  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. 単語数

単語数の問題は、ビッグデータ処理フレームワークの機能を紹介するために一般的に使用される問題です。 基本的な解決策は、テキスト入力での単語の出現をカウントすることです。 Flinkを使用して、この問題の解決策を実装しましょう。

ソリューションの最初のステップとして、入力をトークン(単語)に分割する LineSplitter クラスを作成し、トークンごとにキーと値のペアのTuple2を収集します。 これらのタプルのそれぞれで、キーはテキストで見つかった単語であり、値は整数1です。

このクラスは、 FlatMapFunction 取るインターフェース入力として、を生成しますタプル2

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        Stream.of(value.toLowerCase().split("\\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));
    }
}

Collectorクラスでcollect()メソッドを呼び出して、処理パイプラインでデータを転送します。

次の最後のステップは、タプルを最初の要素(単語)でグループ化し、2番目の要素で sum 集計を実行して、単語の出現回数を生成することです。

public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}

flatMap() groupBy()、および Aggregate()の3種類のFlink変換を使用しています。

単語数の実装が期待どおりに機能していることを確認するためのテストを作成しましょう。

List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
 
assertThat(collect).containsExactlyInAnyOrder(
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. DataStream API

6.1. データストリームの作成

Apache Flinkは、DataStreamAPIを介したイベントのストリームの処理もサポートしています。 イベントの消費を開始する場合は、最初にStreamExecutionEnvironmentクラスを使用する必要があります。

StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();

次に、さまざまなソースからの executeEnvironment を使用して、イベントのストリームを作成できます。 Apache Kafka のようなメッセージバスの場合もありますが、この例では、いくつかの文字列要素からソースを作成します。

DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");

通常のDataSetクラスのように、DataStreamのすべての要素に変換を適用できます。

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

実行をトリガーするには、 print()などのシンク操作を呼び出す必要があります。これは、 execute()メソッドに続いて、変換の結果を標準出力に出力するだけです。 StreamExecutionEnvironment クラス:

upperCase.print();
env.execute();

次の出力が生成されます。

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. イベントのウィンドウ処理

イベントのストリームをリアルタイムで処理する場合、イベントをグループ化し、それらのイベントのウィンドウに計算を適用する必要がある場合があります。

イベントのストリームがあり、各イベントがイベント番号とイベントがシステムに送信されたときのタイムスタンプで構成されるペアであり、順序が狂っているイベントを許容できると仮定します。 20秒以上遅れています。

この例では、最初に、数分間隔の2つのイベントをシミュレートするストリームを作成し、遅延しきい値を指定するタイムスタンプエクストラクターを定義しましょう。

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
 
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;
        }
    });

次に、イベントを5秒のウィンドウにグループ化し、それらのイベントに変換を適用するウィンドウ操作を定義しましょう。

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();

5秒ごとのウィンドウの最後の要素を取得するため、次のように出力されます。

1> (15,1491221519)

2番目のイベントは、指定された遅延しきい値より遅れて到着したため、表示されないことに注意してください。

7. 結論

この記事では、Apache Flinkフレームワークを紹介し、そのAPIで提供されるいくつかの変換について説明しました。

Flinkの流暢で機能的なDataSetAPIを使用して単語数プログラムを実装しました。 次に、DataStream APIを確認し、イベントのストリームに単純なリアルタイム変換を実装しました。

これらすべての例とコードスニペットの実装は、GitHubにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。