1. 序章

Spring Cloud Data Flow は、コンポーザブルデータマイクロサービス向けのクラウドネイティブプログラミングおよびオペレーティングモデルです。

Spring Cloud Data Flow を使用すると、開発者は、データの取り込み、リアルタイム分析、データのインポート/エクスポートなどの一般的なユースケースのデータパイプラインを作成および調整できます。

このデータパイプラインには、ストリーミングとバッチデータパイプラインの2つの種類があります。

最初のケースでは、無制限の量のデータがメッセージングミドルウェアを介して消費または生成されます。 2番目のケースでは、短期間のタスクが有限のデータセットを処理してから終了します。

この記事では、ストリーミング処理に焦点を当てます。

2. アーキテクチャの概要

これらのタイプのアーキテクチャの主要なコンポーネントは、アプリケーションデータフローサーバー、およびターゲットランタイムです。

また、これらの主要コンポーネントに加えて、通常、アーキテクチャ内にデータフローシェルメッセージブローカーがあります。

これらすべてのコンポーネントを詳しく見ていきましょう。

2.1. アプリケーション

通常、ストリーミングデータパイプラインには、外部システムからのイベントの消費、データ処理、およびポリグロットの永続性が含まれます。 これらのフェーズは、 Spring Cloud の用語では、一般に Source Processor 、およびSinkと呼ばれます。

  • ソース: イベントを消費するアプリケーションです
  • プロセッサ:ソースからデータを消費し、処理を行い、処理されたデータをパイプライン内の次のアプリケーションに送信します
  • シンク:ソースまたはプロセッサーから消費し、データを目的の永続層に書き込みます

これらのアプリケーションは、次の2つの方法でパッケージ化できます。

  • Mavenリポジトリ、ファイル、http、またはその他のSpringリソース実装でホストされているSpring Boot uber-jar(この方法はこの記事で使用されます)
  • Docker

一般的なユースケース向けの多くのソース、プロセッサ、およびシンクアプリケーション(例: jdbc、hdfs、http、router)はすでに提供されており、 Spring Cloud DataFlowチームが使用できるようになっています。

2.2. ランタイム

また、これらのアプリケーションを実行するにはランタイムが必要です。 サポートされているランタイムは次のとおりです。

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • 開発用のローカルサーバー(この記事で使用されます)

2.3. データフローサーバー

ランタイムへのアプリケーションのデプロイを担当するコンポーネントは、データフローサーバーです。 ターゲットランタイムごとに、 Data FlowServer実行可能jarが提供されています。

データフローサーバーは、以下の解釈を担当します。

  • 複数のアプリケーションを介したデータの論理フローを記述するストリームDSL。
  • ランタイムへのアプリケーションのマッピングを説明するデプロイメントマニフェスト。

2.4. データフローシェル

データフローシェルは、データフローサーバーのクライアントです。シェルを使用すると、サーバーとの対話に必要なDSLコマンドを実行できます。

例として、httpソースからjdbcシンクへのデータのフローを記述するDSLは、「http | jdbc」。 DSL内のこれらの名前は、 Data Flow Server に登録され、MavenまたはDockerリポジトリでホストできるアプリケーションアーティファクトにマッピングされます。

Springは、ストリーミングデータパイプラインを作成および監視するためのFloという名前のグラフィカルインターフェイスも提供します。 ただし、その使用はこの記事の説明の範囲外です。

2.5. メッセージブローカー

前のセクションの例でを見てきたように、データフローの定義にパイプシンボルを使用しました。 パイプ記号は、メッセージングミドルウェアを介した2つのアプリケーション間の通信を表します。

これは、メッセージブローカーが必要であることを意味しますターゲット環境で稼働しています。

サポートされている2つのメッセージングミドルウェアブローカーは次のとおりです。

  • Apache Kafka
  • RabbitMQ

これで、アーキテクチャコンポーネントの概要がわかりました。最初のストリーム処理パイプラインを構築するときが来ました。

3. メッセージブローカーをインストールする

これまで見てきたように、パイプライン内のアプリケーションは、通信するためにメッセージングミドルウェアを必要とします。 この記事では、RabbitMQを使用します。

インストールの詳細については、公式サイトの指示に従ってください。

4. ローカルデータフローサーバー

アプリケーションの生成プロセスを高速化するために、 SpringInitializrを使用します。 その助けを借りて、Spring Bootアプリケーションを数分で入手できます。

Webサイトにアクセスしたら、グループアーティファクトの名前を選択するだけです。

これが完了したら、プロジェクトの生成ボタンをクリックして、Mavenアーティファクトのダウンロードを開始します。

ダウンロードが完了したら、プロジェクトを解凍し、選択したIDEにMavenプロジェクトとしてインポートします。

プロジェクトにMaven依存関係を追加しましょう。 Dataflow Local Server ライブラリが必要になるため、 spring-cloud-starter-dataflow-server-local依存関係を追加しましょう。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

次に、 SpringBootメインクラスに@EnableDataFlowServerアノテーションを付ける必要があります。

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

それで全部です。 ローカルデータフローサーバーを実行する準備ができました。

mvn spring-boot:run

アプリケーションはポート9393で起動します。

5. データフローシェル

ここでも、Spring Initializrに移動し、GroupおよびArtifactの名前を選択します。

プロジェクトをダウンロードしてインポートしたら、 spring-cloud-dataflow-shell依存関係を追加しましょう。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

次に、@EnableDataFlowShellアノテーションをSpringBootメインクラスに追加する必要があります。

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

これでシェルを実行できます。

mvn spring-boot:run

シェルの実行後、プロンプトに help コマンドを入力して、実行できるコマンドの完全なリストを確認できます。

6. ソースアプリケーション

同様に、Initializrで、単純なアプリケーションを作成し、 spring -cloud-starter-stream-rabbit:という StreamRabbit依存関係を追加します。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

次に、 @EnableBinding(Source.class)アノテーションをSpring Bootメインクラスに追加します。

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

次に、処理する必要のあるデータのソースを定義する必要があります。 このソースは、潜在的に無限のワークロード(モノのインターネットセンサーデータ、24時間年中無休のイベント処理、オンライントランザクションデータの取り込み)である可能性があります。

サンプルアプリケーションでは、 Poller を使用して10秒ごとに1つのイベント(簡単にするために新しいタイムスタンプ)を生成します。

@InboundChannelAdapter アノテーションは、メッセージのペイロードとして戻り値を使用して、ソースの出力チャネルにメッセージを送信します。

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

データソースの準備ができました。

7. プロセッサアプリケーション

次に、アプリケーションを作成し、 StreamRabbit依存関係を追加します。

次に、 @EnableBinding(Processor.class)アノテーションをSpring Bootメインクラスに追加します。

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

次に、ソースアプリケーションからのデータを処理するメソッドを定義する必要があります。

トランスフォーマーを定義するには、このメソッドに@Transformerアノテーションを付ける必要があります。

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

タイムスタンプを「入力」チャネルから「出力」チャネルに送信されるフォーマットされた日付に変換します。

8. シンクアプリケーション

作成する最後のアプリケーションはSinkアプリケーションです。

ここでも、Spring Initializrに移動し、グループアーティファクトの名前を選択します。 プロジェクトをダウンロードした後、 StreamRabbit依存関係を追加しましょう。

次に、 @EnableBinding(Sink.class)アノテーションを SpringBootメインクラスに追加します。

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
	SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

次に、プロセッサアプリケーションからのメッセージを傍受する方法が必要です。

これを行うには、 @StreamListener(Sink.INPUT)アノテーションをメソッドに追加する必要があります。

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

このメソッドは、フォーマットされた日付に変換されたタイムスタンプをログファイルに出力するだけです。

9. ストリームアプリを登録する

Spring Cloudデータフローシェルを使用すると、 app register コマンドを使用して、ストリームアプリをAppRegistryに登録できます。

アプリアーティファクトに解決できる一意の名前、アプリケーションタイプ、およびURIを提供する必要があります。 タイプには、「 source 」、「 Processor 」、または「think」を指定します。

MavenスキームでURIを提供する場合、形式は次のとおりである必要があります。

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

Source Processor Sink アプリケーション以前に作成されたを登録するには、 Spring Cloud Data FlowShellに移動します。プロンプトから次のコマンドを発行します。

app register --name time-source --type source 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

10. ストリームを作成してデプロイする

新しいストリーム定義を作成するには、 Spring Cloud Data Flow Shell に移動し、次のシェルコマンドを実行します。

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'

これは、DSL式‘time-source|に基づいてtime-to-logという名前のストリームを定義します。 タイムプロセッサ| ロギングシンク’

次に、ストリームをデプロイするには、次のシェルコマンドを実行します。

stream deploy --name time-to-log

Data Flow Server は、 time-source time-processor 、および logging-sink をMaven座標に解決し、それらを使用してストリームのtime-source time-processor 、およびlogging-sinkアプリケーション。

ストリームが正しくデプロイされている場合、データフローサーバーログに、モジュールが開始され、相互に関連付けられていることがわかります。

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. 結果の確認

この例では、ソースは現在のタイムスタンプを毎秒メッセージとして送信し、プロセッサはそれをフォーマットし、ログシンクはロギングフレームワークを使用してフォーマットされたタイムスタンプを出力します。

ログファイルは、上記のように、 Data FlowServerのログ出力に表示されるディレクトリ内にあります。 結果を確認するには、ログを調整します。

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. 結論

この記事では、 Spring Cloud Data Flow を使用して、ストリーム処理用のデータパイプラインを構築する方法を説明しました。

また、ストリーム内の Source Processor Sink アプリケーションの役割と、このモジュールを DataFlowServer内に接続して接続する方法についても説明しました。 データフローシェルを使用した

サンプルコードはGitHubプロジェクトにあります。