1前書き

Spring Cloud Data Flowは、構成可能なデータマイクロサービス用のクラウドネイティブのプログラミングおよび運用モデルです。



Spring Cloud Data Flow


を使用すると、開発者はデータの取り込み、リアルタイム分析、データのインポート/エクスポートなどの一般的なユースケースのためにデータパイプラインを作成および調整できます。 。

このデータパイプラインには、ストリーミングデータパイプラインとバッチデータパイプラインの2種類があります。

前者の場合、無制限の量のデータがメッセージングミドルウェアを介して消費または生成されます。 2番目のケースでは、短期間のタスクが有限のデータセットを処理してから終了します。

この記事ではストリーミング処理に焦点を当てます。


2建築の概観

これらのアーキテクチャの主要コンポーネントは、

アプリケーション



データフローサーバー

、およびターゲットランタイムです。

また、これらの重要なコンポーネントに加えて、通常、アーキテクチャ内に

Data Flow Shell

および

message broker

もあります。

これらすべてのコンポーネントを詳細に見てみましょう。


2.1. アプリケーション

通常、ストリーミングデータパイプラインには、外部システムからのイベントの消費、データ処理、およびポリグロットの持続性が含まれます。これらのフェーズは、一般的に、

Spring Cloud

の用語で

Source



Processor

、および

Sink

と呼ばれています。


  • 出典:


  • Processor:



    Source

    からのデータを消費し、いくつかの処理を行います。

処理したデータをパイプラインの次のアプリケーションに送信します。


Sink:** は、

Source

または

Processor

から消費して、

目的の永続層へのデータの追加

これらのアプリケーションは、2つの方法でパッケージ化できます。

Mavenリポジトリ、ファイル、httpでホストされている** Spring Bootのuber-jar

または他のSpringリソース実装(このメソッドは
この記事)
** ドッカー

一般的なユースケース(jdbc、hdfs、http、routerなど)用の多くのソース、プロセッサ、およびシンクアプリケーションがすでに提供されており、

Spring Cloud Data Flow

チームによって使用する準備ができています。

** 2.2. ランタイム

**

また、これらのアプリケーションを実行するにはランタイムが必要です。サポートされているランタイムは次のとおりです。

  • クラウドファウンドリ

  • Apache YARN

クベルネテス

  • Apache Mesos

  • 開発用ローカルサーバー(この記事で使用されます)

** 2.3. データフローサーバ

**

アプリケーションをランタイムにデプロイする役割を担うコンポーネントは、

Data Flow Server

です。各ターゲットランタイムに提供される

Data Flow Server

実行可能jarファイルがあります。

Data Flow Serverは、次の解釈を担当します。

  • 複数の要素を介したデータの論理的な流れを記述するストリームDSL

アプリケーション

  • アプリケーションのマッピングを記述するデプロイメントマニフェスト

ランタイム


2.4. データフローシェル

データフローシェルは、データフローサーバーのクライアントです。

例として、httpソースからjdbcシンクへのデータの流れを記述するためのDSLは、「http | http |」と記述されます。 jdbc」 DSL内のこれらの名前は

Data Flow Server

に登録され、MavenまたはDockerリポジトリでホストできるアプリケーション成果物にマッピングされます。

Springはまた、ストリーミングデータパイプラインを作成および監視するための

Flo

という名前のグラフィカルインターフェイスも提供しています。ただし、その使用はこの記事の説明の範囲外です。

** 2.5. メッセージブローカー

**

セクションで見たように、データの流れの定義にパイプ記号を使用しました。パイプ記号は、メッセージングミドルウェアを介した2つのアプリケーション間の通信を表します。

サポートされている2つのメッセージングミドルウェアブローカーは以下のとおりです。

  • Apache Kafka

  • うさぎMQ

これで、アーキテクチャコンポーネントの概要がわかりました。** 最初のストリーム処理パイプラインを構築する時が来ました。

** 3メッセージブローカをインストールする

**

これまで見てきたように、パイプラインのアプリケーションは通信するためにメッセージングミドルウェアを必要とします。この記事の目的のために、私たちは

RabbitMQ

を使います。

インストールの詳細については、https://www.rabbitmq.com/download.html[公式サイト]の指示に従ってください。


4ローカルデータフローサーバー

アプリケーションの生成プロセスをスピードアップするために、http://start.spring.io/[Spring Initializr]を使用します。その助けを借りて、私たちは数分で私たちの

Spring Boot

アプリケーションを入手することができます。

Webサイトにアクセスしたら、

Group



Artifact

の名前を選択します。

これが完了したら、

プロジェクトの生成

ボタンをクリックしてMaven成果物のダウンロードを開始します。

リンク:/uploads/init.jpg%201365w[]

ダウンロードが完了したら、プロジェクトを解凍し、選択したIDEにMavenプロジェクトとしてインポートします。

プロジェクトにMavenの依存関係を追加しましょう。

Dataflow Local Server

ライブラリが必要になるので、https://search.maven.org/search?q=spring-cloud-starter-dataflow-server-local[spring-cloud-starter-dataflow-server-local]を追加しましょう依存関係:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

今度は

Spring Boot

メインクラスに

@ EnableDataFlowServer

アノテーションを付ける必要があります。

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[]args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

それで全部です。

ローカルデータフローサーバー

を実行する準備が整いました。

mvn spring-boot:run

アプリケーションはポート9393で起動します。


5データフローシェル

もう一度、Spring Initializrに行き、

Group



Artifact

の名前を選択してください。

プロジェクトをダウンロードしてインポートしたら、https://search.maven.org/search?q=a:spring-cloud-dataflow-shell[spring-cloud-dataflow-shell]依存関係を追加しましょう。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

次に、

Spring Boot

メインクラスに

@ EnableDataFlowShell

アノテーションを追加する必要があります。

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

    public static void main(String[]args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

これでシェルを実行できます。

mvn spring-boot:run

シェルの実行後、プロンプトに

help

コマンドを入力して、実行できるコマンドの完全なリストを確認できます。


6. ソースアプリケーション

同様に、Initializrでは、簡単なアプリケーションを作成し、https://search.maven.org/search?q=a:spring-cloud-starter-stream-rabbit[spring-cloud-という

Stream Rabbit

依存関係を追加します。 starter-stream-rabbit:]

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

次に、

Spring Boot

メインクラスに

@ EnableBinding(Source.class)

アノテーションを追加します。

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {

    public static void main(String[]args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

今度は処理しなければならないデータのソースを定義する必要があります。

このソースは、無限の可能性のあるワークロード(インターネットのセンサーデータ、24時間365日のイベント処理、オンライントランザクションデータ取り込み)の可能性があります。

  • サンプルアプリケーションでは、

    Poller

    を使用して10秒ごとに1つのイベント(単純にするために新しいタイムスタンプ)を生成します。


@ InboundChannelAdapter

アノテーションは、戻り値をメッセージのペイロードとして使用して、メッセージをソースの出力チャネルに送信します。

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT,
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

私たちのデータソースは準備ができています。

** 7. プロセッサアプリケーション

**

次に、アプリケーションを作成し、

Stream Rabbit

依存関係を追加します。

次に、

Spring Boot

メインクラスに

@ EnableBinding(Processor.class)

アノテーションを追加します。

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[]args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

次に、ソースアプリケーションから来るデータを処理するためのメソッドを定義する必要があります。

トランスフォーマを定義するには、このメソッドに

@ Transformer

アノテーションを付ける必要があります。

@Transformer(inputChannel = Processor.INPUT,
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

タイムスタンプを ‘input’チャンネルから ‘output’チャンネルに送信されるフォーマットされた日付に変換します。

** 8シンクアプリケーション+

**

最後に作成するアプリケーションはSinkアプリケーションです。

もう一度、Spring Initializrに行き、

Group



Artifact

の名前を選択します。プロジェクトをダウンロードしたら、

Stream Rabbit

依存関係を追加しましょう。

次に、

Spring Boot

メインクラスに

@ EnableBinding(Sink.class)

アノテーションを追加します。

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[]args) {
    SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

今度は、プロセッサアプリケーションからのメッセージを傍受するためのメソッドが必要です。

これを行うには、

@ StreamListener(Sink.INPUT)

アノテーションをメソッドに追加する必要があります。

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

このメソッドは、フォーマットされた日付に変換されたタイムスタンプをログファイルに単純に出力します。

** 9ストリームアプリを登録する

**

Spring Cloud Data Flow Shellでは、

app register

コマンドを使用してStream AppをApp Registryに登録できます。

アプリアーティファクトに解決できる一意の名前、アプリケーションの種類、およびURIを提供する必要があります。タイプには、 ”

source

“、 ”

processor

“、または ”

sink

“を指定します。

Maven方式でURIを提供する場合、フォーマットは以下に準拠する必要があります。

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>


Source



Processor

、および

Sink

アプリケーションを登録するには、

Spring Cloud Data Flow Shell

に移動して、プロンプトから次のコマンドを発行します。

app register --name time-source --type source
  --uri maven://org.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor
  --uri maven://org.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink
  --uri maven://org.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT


10ストリームを作成してデプロイする

新しいストリーム定義を作成するには、

Spring Cloud Data Flow Shell

に移動し、次のシェルコマンドを実行します。

stream create --name time-to-log
  --definition 'time-source | time-processor | logging-sink'

これは、DSL表現に基づいて

time-to-log

という名前のストリームを定義します。タイムプロセッサlogging-sink’__

その後、ストリームをデプロイするために、次のシェルコマンドを実行します。

stream deploy --name time-to-log


Data Flow Server



time-source



time-processor

、および

logging-sink

を解決して座標を作成し、それらを使用してストリームの

time-source



time-processor

、および

logging-sink

アプリケーションを起動します。

ストリームがデプロイされた場合、

Data Flow Server

のログに、モジュールが開始され、結び付けられていることが記録されます。

2016-08-24 12:29:10.516  INFO 8096 ---[io-9393-exec-10]o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH__TO__LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 ---[io-9393-exec-10]o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH__TO__LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 ---[io-9393-exec-10]o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH__TO__LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

** 11結果を確認する

**

この例では、ソースは単に現在のタイムスタンプを毎秒メッセージとして送信し、プロセッサはそれをフォーマットし、ログシンクはロギングフレームワークを使用してフォーマットされたタイムスタンプを出力します。

ログファイルは、上記のように、

Data Flow Server

のログ出力に表示されるディレクトリ内にあります。結果を見るために、ログを調整することができます。

tail -f PATH__TO__LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout__0.log
2016-08-24 12:40:42.029  INFO 9488 ---[r.time-to-log-1]s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 ---[r.time-to-log-1]s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 ---[r.time-to-log-1]s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21


12. 結論

この記事では、

Spring Cloud Data Flow

を使用してストリーム処理用のデータパイプラインを構築する方法を説明しました。

また、ストリーム内の

Source



Processor

、および

Sink

アプリケーションの役割、および

Data Flow Shell

を使用して

Data Flow Server

内にこのモジュールをプラグインして結び付ける方法も説明しました。

サンプルコードはhttps://github.com/eugenp/tutorials/tree/master/spring-cloud-data-flow[GitHubプロジェクト]にあります。