1. 概要

Apache Flink は、Javaで簡単に使用できるストリーム処理フレームワークです。 Apache Kafka は、高いフォールトトレランスをサポートする分散ストリーム処理システムです。

このチュートリアルでは、これら2つのテクノロジーを使用してデータパイプラインを構築する方法を見ていきます。

2. インストール

Apache Kafkaをインストールして構成するには、公式ガイドを参照してください。 インストール後、次のコマンドを使用して、 flink_inputおよびflink_output:という新しいトピックを作成できます。

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_output

 bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_input

このチュートリアルでは、ApacheKafkaのデフォルト構成とデフォルトポートを使用します。

3. フリンクの使用法

Apache Flinkは、リアルタイムのストリーム処理テクノロジーを可能にします。 フレームワークでは、複数のサードパーティシステムをストリームソースまたはシンクとして使用できます

Flinkでは–さまざまなコネクタが利用可能です:

  • Apache Kafka(ソース/シンク)
  • Apache Cassandra(シンク)
  • Amazon Kinesis Streams(ソース/シンク)
  • Elasticsearch(シンク)
  • Hadoopファイルシステム(シンク)
  • RabbitMQ(ソース/シンク)
  • Apache NiFi(ソース/シンク)
  • TwitterストリーミングAPI(ソース)

Flinkをプロジェクトに追加するには、次のMaven依存関係を含める必要があります。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.5.0</version>
</dependency>

これらの依存関係を追加すると、Kafkaトピックとの間で消費および生成できるようになります。 Flinkの現在のバージョンは、 MavenCentralで見つけることができます。

4. カフカストリングコンシューマー

FlinkでKafkaからのデータを使用するには、トピックとKafkaアドレスを指定する必要があります。オフセットを保持するために使用されるグループIDも指定する必要があります。これにより、常に読み取りが行われるとは限りません。最初から全データ。

FlinkKafkaConsumerの作成を簡単にする静的メソッドを作成しましょう。

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
  String topic, String kafkaAddress, String kafkaGroup ) {
 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id",kafkaGroup);
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
      topic, new SimpleStringSchema(), props);

    return consumer;
}

このメソッドは、 topic、kafkaAddress、、および kafkaGroup を取り、 FlinkKafkaConsumer を作成します。これにより、特定のトピックのデータがStringとして消費されます。 SimpleStringSchemaを使用してデータをデコードしました。

クラス名の番号011は、Kafkaバージョンを示します。

5. カフカストリングプロデューサー

Kafkaにデータを生成するには、使用するKafkaアドレスとトピックを提供する必要があります。ここでも、さまざまなトピックのプロデューサーを作成するのに役立つ静的メソッドを作成できます。

public static FlinkKafkaProducer011<String> createStringProducer(
  String topic, String kafkaAddress){

    return new FlinkKafkaProducer011<>(kafkaAddress,
      topic, new SimpleStringSchema());
}

Kafkaトピックを作成するときにグループIDを指定する必要がないため、このメソッドはtopickafkaAddressのみを引数として取ります。

6. 文字列ストリーム処理

完全に機能するコンシューマーとプロデューサーがあれば、Kafkaからのデータを処理して、結果をKafkaに保存することができます。 ストリーム処理に使用できる関数の完全なリストは、ここにあります。

この例では、各Kafkaエントリの単語を大文字にして、Kafkaに書き戻します。

この目的のために、カスタムMapFunctionを作成する必要があります。

public class WordsCapitalizer implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toUpperCase();
    }
}

関数を作成した後、ストリーム処理で使用できます。

public static void capitalize() {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String address = "localhost:9092";
    StreamExecutionEnvironment environment = StreamExecutionEnvironment
      .getExecutionEnvironment();
    FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
    DataStream<String> stringInputStream = environment
      .addSource(flinkKafkaConsumer);

    FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
      outputTopic, address);

    stringInputStream
      .map(new WordsCapitalizer())
      .addSink(flinkKafkaProducer);
}

アプリケーションは、 flink_input トピックからデータを読み取り、ストリームに対して操作を実行してから、結果をKafkaのflink_outputトピックに保存します。

FlinkとKafkaを使用して文字列を処理する方法を見てきました。 ただし、多くの場合、カスタムオブジェクトに対して操作を実行する必要があります。 これを行う方法については、次の章で説明します。

7. カスタムオブジェクトの逆シリアル化

次のクラスは、送信者と受信者に関する情報を含む単純なメッセージを表します。

@JsonSerialize
public class InputMessage {
    String sender;
    String recipient;
    LocalDateTime sentAt;
    String message;
}

以前はSimpleStringSchemaを使用してKafkaからのメッセージを逆シリアル化していましたが、現在はデータをカスタムオブジェクトに直接逆シリアル化します。

これを行うには、カスタム DeserializationSchema:が必要です。

public class InputMessageDeserializationSchema implements
  DeserializationSchema<InputMessage> {

    static ObjectMapper objectMapper = new ObjectMapper()
      .registerModule(new JavaTimeModule());

    @Override
    public InputMessage deserialize(byte[] bytes) throws IOException {
        return objectMapper.readValue(bytes, InputMessage.class);
    }

    @Override
    public boolean isEndOfStream(InputMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation<InputMessage> getProducedType() {
        return TypeInformation.of(InputMessage.class);
    }
}

ここでは、メッセージがKafkaでJSONとして保持されていると想定しています。

タイプLocalDateTimeのフィールドがあるため、 JavaTimeModule、を指定する必要があります。これは、LocalDateTimeオブジェクトのJSONへのマッピングを処理します。

すべての演算子(スキーマや関数など)はジョブの開始時にシリアル化されるため、Flinkスキーマにシリアル化できないフィールドを含めることはできません

ApacheSparkにも同様の問題があります。 この問題の既知の修正の1つは、上記の ObjectMapper で行ったように、フィールドをstaticとして初期化することです。 これは最も美しいソリューションではありませんが、比較的単純であり、機能します。

メソッドisEndOfStreamは、特定のデータが受信されるまでストリームを処理する必要がある特別な場合に使用できます。 しかし、私たちの場合は必要ありません。

8. カスタムオブジェクトのシリアル化

ここで、システムにメッセージのバックアップを作成する可能性を持たせたいと仮定しましょう。 プロセスを自動化する必要があり、各バックアップは1日中に送信されるメッセージで構成する必要があります。

また、バックアップメッセージには一意のIDを割り当てる必要があります。

この目的のために、次のクラスを作成できます。

public class Backup {
    @JsonProperty("inputMessages")
    List<InputMessage> inputMessages;
    @JsonProperty("backupTimestamp")
    LocalDateTime backupTimestamp;
    @JsonProperty("uuid")
    UUID uuid;

    public Backup(List<InputMessage> inputMessages, 
      LocalDateTime backupTimestamp) {
        this.inputMessages = inputMessages;
        this.backupTimestamp = backupTimestamp;
        this.uuid = UUID.randomUUID();
    }
}

UUID生成メカニズムは、重複を許可するため、完全ではないことに注意してください。 ただし、この例の範囲ではこれで十分です。

Backup オブジェクトをJSONとしてKafkaに保存したいので、SerializationSchemaを作成する必要があります。

public class BackupSerializationSchema
  implements SerializationSchema<Backup> {

    ObjectMapper objectMapper;
    Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);

    @Override
    public byte[] serialize(Backup backupMessage) {
        if(objectMapper == null) {
            objectMapper = new ObjectMapper()
              .registerModule(new JavaTimeModule());
        }
        try {
            return objectMapper.writeValueAsString(backupMessage).getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

9. タイムスタンプメッセージ

毎日のすべてのメッセージのバックアップを作成するため、メッセージにはタイムスタンプが必要です。

Flinkは、 EventTime、ProcessingTime、、およびIngestionTimeの3つの異なる時間特性を提供します。

この場合、メッセージが送信された時刻を使用する必要があるため、EventTime。を使用します。

EventTime を使用するには、入力データからタイムスタンプを抽出するTimestampAssignerが必要です。

public class InputMessageTimestampAssigner 
  implements AssignerWithPunctuatedWatermarks<InputMessage> {
 
    @Override
    public long extractTimestamp(InputMessage element, 
      long previousElementTimestamp) {
        ZoneId zoneId = ZoneId.systemDefault();
        return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(InputMessage lastElement, 
      long extractedTimestamp) {
        return new Watermark(extractedTimestamp - 1500);
    }
}

LocalDateTimeEpochSecondに変換する必要があります。これは、Flinkで期待される形式だからです。 タイムスタンプを割り当てた後、すべての時間ベースの操作は、sendAtフィールドからの時間を使用して操作します。

Flinkはタイムスタンプがミリ秒単位であると想定し、 toEpochSecond()は秒単位で時間を返すため、1000を掛ける必要があり、Flinkはウィンドウを正しく作成します。

Flinkはの概念を定義します透かし。 透かしは、送信された順序でデータが到着しない場合に役立ちます。 透かしは、要素の処理に許可される最大遅延を定義します。

透かしよりも低いタイムスタンプを持つ要素は、まったく処理されません。

10. タイムウィンドウの作成

バックアップが1日の間に送信されたメッセージのみを収集することを保証するために、ストリームで timeWindowAll メソッドを使用できます。これにより、メッセージがウィンドウに分割されます。

ただし、各ウィンドウからのメッセージを集約し、バックアップとして返す必要があります。

これを行うには、カスタムAggregateFunctionが必要です。

public class BackupAggregator 
  implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
 
    @Override
    public List<InputMessage> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<InputMessage> add(
      InputMessage inputMessage,
      List<InputMessage> inputMessages) {
        inputMessages.add(inputMessage);
        return inputMessages;
    }

    @Override
    public Backup getResult(List<InputMessage> inputMessages) {
        return new Backup(inputMessages, LocalDateTime.now());
    }

    @Override
    public List<InputMessage> merge(List<InputMessage> inputMessages,
      List<InputMessage> acc1) {
        inputMessages.addAll(acc1);
        return inputMessages;
    }
}

11. バックアップの集約

適切なタイムスタンプを割り当て、 AggregateFunction を実装した後、最終的にKafka入力を取得して処理できます。

public static void createBackup () throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "192.168.99.100:9092";
    StreamExecutionEnvironment environment
      = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
      = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
    flinkKafkaConsumer.setStartFromEarliest();

    flinkKafkaConsumer.assignTimestampsAndWatermarks(
      new InputMessageTimestampAssigner());
    FlinkKafkaProducer011<Backup> flinkKafkaProducer
      = createBackupProducer(outputTopic, kafkaAddress);

    DataStream<InputMessage> inputMessagesStream
      = environment.addSource(flinkKafkaConsumer);

    inputMessagesStream
      .timeWindowAll(Time.hours(24))
      .aggregate(new BackupAggregator())
      .addSink(flinkKafkaProducer);

    environment.execute();
}

12. 結論

この記事では、ApacheFlinkとApacheKafkaを使用して単純なデータパイプラインを作成する方法を紹介しました。

いつものように、コードはGithubにあります。