1. 概要

Spring Cloud Data Flow は、リアルタイムのデータパイプラインとバッチプロセスを構築するためのクラウドネイティブツールキットです。 Spring Cloud Data Flowは、単純なインポート/エクスポート、ETL処理、イベントストリーミング、予測分析など、さまざまなデータ処理のユースケースですぐに使用できます。

このチュートリアルでは、JDBCデータベースからデータを抽出し、それを単純なPOJOに変換して、MongoDBにロードするストリームパイプラインを使用したリアルタイムの抽出変換およびロード(ETL)の例を学習します。

2. ETLとイベントストリーム処理

ETL –抽出、変換、およびロード–は、一般に、複数のデータベースおよびシステムから共通のデータウェアハウスにデータをバッチロードするプロセスと呼ばれていました。 このデータウェアハウスでは、システムの全体的なパフォーマンスを損なうことなく、大量のデータ分析処理を実行できます。

しかし、新しいトレンドがこれを行う方法を変えています。 ETLには、データウェアハウスとデータレイクにデータを転送する役割があります。

現在、これは、Spring Cloud Data Flow を使用して、イベントストリームアーキテクチャのストリームで実行できます。

3. SpringCloudデータフロー

Spring Cloud Data Flow(SCDF)を使用すると、開発者は2つのフレーバーでデータパイプラインを作成できます。

  • SpringCloudStreamを使用した長寿命のリアルタイムストリームアプリケーション
  • SpringCloudTaskを使用した短期間のバッチタスクアプリケーション

この記事では、Spring Cloudストリームに基づく最初の長寿命のストリーミングアプリケーションについて説明します。

3.1. Springクラウドストリームアプリケーション

SCDFストリームパイプラインはステップで構成されます。ここで各ステップは、Spring CloudStreamマイクロフレームワークを使用してSpringBootスタイルで構築されたアプリケーションです。これらのアプリケーションは、次のようなメッセージングミドルウェアによって統合されます。 ApacheKafkaまたはRabbitMQ。

これらのアプリケーションは、ソース、プロセッサ、およびシンクに分類されます。 ETLプロセスと比較すると、ソースは「抽出」、プロセッサは「トランス」、シンクは「ロード」の部分であると言えます。

場合によっては、パイプラインの1つ以上のステップでアプリケーションスターターを使用できます。 これは、ステップに新しいアプリケーションを実装する必要がないことを意味しますが、代わりに、すでに実装されている既存のアプリケーションスターターを構成します。

アプリケーションスターターのリストはここにあります。

3.2. SpringCloudデータフローサーバー

アーキテクチャの最後の部分はSpringCloudData FlowServerです。 SCDFサーバーは、Spring Cloud Deployer仕様を使用して、アプリケーションとパイプラインストリームのデプロイを行います。 この仕様は、Kubernetes、Apache Mesos、Yarn、Cloud Foundryなど、さまざまな最新のランタイムにデプロイすることで、SCDFクラウドネイティブフレーバーをサポートします。

また、ストリームをローカル展開として実行することもできます。

SCDFアーキテクチャの詳細については、ここを参照してください。

4. 環境設定

始める前に、この複雑な展開の一部を選択する必要があります。 最初に定義するのはSCDFサーバーです。

テストには、ローカル開発にSCDFサーバーローカルを使用します。 実稼働環境では、後で SCDF ServerKubernetesなどのクラウドネイティブランタイムを選択できます。 サーバーランタイムのリストはここにあります。

それでは、このサーバーを実行するためのシステム要件を確認しましょう。

4.1. システム要求

SCDFサーバーを実行するには、2つの依存関係を定義して設定する必要があります。

  • メッセージングミドルウェア、および
  • RDBMS。

メッセージングミドルウェアの場合、 RabbitMQを使用し、パイプラインストリーム定義を格納するためのRDBMSとしてPostgreSQLを選択します。

RabbitMQを実行するには、最新バージョンここをダウンロードし、デフォルト構成を使用してRabbitMQインスタンスを開始するか、次のDockerコマンドを実行します。

docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

最後のセットアップ手順として、デフォルトのポート5432にPostgreSQLRDBMSをインストールして実行します。 この後、次のスクリプトを使用して、SCDFがストリーム定義を格納できるデータベースを作成します。

CREATE DATABASE dataflow;

4.2. SpringCloudデータフローサーバーローカル

SCDFサーバーローカルを実行するために、docker-composeを使用してサーバーを起動するか、またはJavaアプリケーションとして起動するかを選択できます。

ここでは、SCDFサーバーローカルをJavaアプリケーションとして実行します。アプリケーションを構成するには、構成をJavaアプリケーションパラメーターとして定義する必要があります。 システムパスにJava8が必要です。

jarと依存関係をホストするには、SCDFサーバーのホームフォルダーを作成し、SCDFサーバーローカルディストリビューションをこのフォルダーにダウンロードする必要があります。 SCDFサーバーローカルの最新のディストリビューションはここからダウンロードできます。

また、libフォルダーを作成し、そこにJDBCドライバーを配置する必要があります。 PostgreSQLドライバーの最新バージョンはこちらで入手できます。 

最後に、SCDFローカルサーバーを実行してみましょう。

$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
    --spring.datasource.username=postgres_username \
    --spring.datasource.password=postgres_password \
    --spring.datasource.driver-class-name=org.postgresql.Driver \
    --spring.rabbitmq.host=127.0.0.1 \
    --spring.rabbitmq.port=5672 \
    --spring.rabbitmq.username=guest \
    --spring.rabbitmq.password=guest

次のURLを確認することで、実行されているかどうかを確認できます。

http:// localhost:9393 / dashboard

4.3. SpringCloudデータフローシェル

SCDFシェルはコマンドラインツールであり、アプリケーションとパイプラインの作成と展開を簡単に行うことができます。 これらのシェルコマンドは、Spring Cloud Data Flow Server RESTAPI上で実行されます。

最新バージョンのjarをSCDFホームフォルダにダウンロードします。ここから入手できます。完了したら、次のコマンドを実行します(必要に応じてバージョンを更新します)。

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
  ____                              ____ _                __
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/


Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

最後の行に「dataflow:>」の代わりに「 server-unknown:>」が表示される場合は、ローカルホストでSCDFサーバーを実行していません。 この場合、次のコマンドを実行して別のホストに接続します。

server-unknown:>dataflow config server http://{host}

これで、シェルがSCDFサーバーに接続され、コマンドを実行できるようになりました。

Shellで最初に行う必要があるのは、アプリケーションスターターをインポートすることです。 SpringBoot2.0.xのRabbitMQ+Mavenの最新バージョンここを見つけて、次のコマンドを実行します(ここでも、バージョンを更新します。必要に応じて、ここ「 Darwin-SR1 」) :

$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

インストールされているアプリケーションを確認するには、次のシェルコマンドを実行します。

$ dataflow:> app list

その結果、インストールされているすべてのアプリケーションを含むテーブルが表示されます。

また、SCDFは、 Flo という名前のグラフィカルインターフェイスを提供します。このインターフェイスには、 http:// localhost:9393 /dashboardでアクセスできます。  ただし、その使用はこの記事の範囲内ではありません。

5. ETLパイプラインの作成

次に、ストリームパイプラインを作成しましょう。 これを行うために、JDBCソースアプリケーションスターターを使用して、リレーショナルデータベースから情報を抽出します。

また、情報構造を変換するためのカスタムプロセッサと、データをMongoDBにロードするためのカスタムシンクを作成します。

5.1. 抽出–抽出用のリレーショナルデータベースの準備

crmという名前のデータベースとcustomerという名前のテーブルを作成しましょう。

CREATE DATABASE crm;
CREATE TABLE customer (
    id bigint NOT NULL,
    imported boolean DEFAULT false,
    customer_name character varying(50),
    PRIMARY KEY(id)
)

フラグimportedを使用していることに注意してください。このフラグは、すでにインポートされているレコードを格納します。 必要に応じて、この情報を別のテーブルに保存することもできます。

それでは、いくつかのデータを挿入しましょう。

INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2. 変換– JDBCフィールドのMongoDBフィールド構造へのマッピング

変換ステップでは、フィールドcustomer_nameをソーステーブルから新しいフィールドnameに簡単に変換します。 ここで他の変換を行うこともできますが、例は短くしておきましょう。

これを行うには、customer-transformという名前の新しいプロジェクトを作成します。これを行う最も簡単な方法は、 SpringInitializrサイトを使用してプロジェクトを作成することです。 Webサイトにアクセスしたら、グループとアーティファクトの名前を選択します。 com.customercustomer-transform、をそれぞれ使用します。

これが完了したら、「プロジェクトの生成」ボタンをクリックしてプロジェクトをダウンロードします。 次に、プロジェクトを解凍してお気に入りのIDEにインポートし、次の依存関係をpom.xmlに追加します。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

これで、フィールド名変換のコーディングを開始する準備が整いました。 これを行うには、アダプターとして機能するCustomerクラスを作成します。 このクラスは顧客名経由 setName() メソッドを介してその値を出力します getName 方法

@JsonProperty アノテーションは、JSONからJavaへの逆シリアル化中に変換を実行します。

public class Customer {

    private Long id;

    private String name;

    @JsonProperty("customer_name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonProperty("name")
    public String getName() {
        return name;
    }

    // Getters and Setters
}

プロセッサは、入力からデータを受信し、変換を実行して、結果を出力チャネルにバインドする必要があります。 これを行うためのクラスを作成しましょう:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;

@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Customer convertToPojo(Customer payload) {

        return payload;
    }
}

上記のコードでは、変換が自動的に行われることを確認できます。 入力はJSONとしてデータを受け取り、Jacksonはsetメソッドを使用してデータをCustomerオブジェクトに逆シリアル化します。

反対は出力の場合で、データはgetメソッドを使用してJSONにシリアル化されます。

5.3. ロード–MongoDBでシンク

変換ステップと同様に、別のMavenプロジェクトを作成します。名前はcustomer-mongodb-sinkです。 ここでも、 Spring Initializr にアクセスし、グループの場合は com.customer を選択し、アーティファクトの場合はcustomer-mongodb-sinkを選択します。 次に、依存関係の検索ボックスに MongoDB と入力し、プロジェクトをダウンロードします。

次に、解凍してお気に入りのIDEにインポートします。

次に、customer-transformプロジェクトと同じ追加の依存関係を追加します。

次に、このステップで入力を受け取るために、別のCustomerクラスを作成します。

import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="customer")
public class Customer {

    private Long id;
    private String name;

    // Getters and Setters
}

Customer をシンクするために、CustomerRepositoryを使用して顧客エンティティを保存するListenerクラスを作成します。

@EnableBinding(Sink.class)
public class CustomerListener {

    @Autowired
    private CustomerRepository repository;

    @StreamListener(Sink.INPUT)
    public void save(Customer customer) {
        repository.save(customer);
    }
}

また、この場合の CustomerRepository は、SpringDataのMongoRepositoryです。

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {

}

5.4. ストリーム定義

これで、両方のカスタムアプリケーションをSCDFサーバーに登録する準備が整いました。これを行うには、Mavenコマンド mvninstallを使用して両方のプロジェクトをコンパイルします。

次に、Spring Cloud DataFlowShellを使用してそれらを登録します。

app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

最後に、アプリケーションがSCDFに保存されているかどうかを確認し、シェルでapplicationlistコマンドを実行します。

app list

結果として、結果の表に両方のアプリケーションが表示されます。

5.4.1. ストリームパイプラインドメイン固有言語– DSL

DSLは、アプリケーション間の構成とデータフローを定義します。 SCDFDSLはシンプルです。 最初の単語では、アプリケーションの名前を定義し、その後に構成を定義します。

また、構文はUnixに着想を得たパイプライン構文であり、複数のアプリケーションを接続するために「パイプ」とも呼ばれる垂直バーを使用します。

http --port=8181 | log

これにより、ポート8181で提供されるHTTPアプリケーションが作成され、受信した本文のペイロードがログに送信されます。

それでは、JDBCソースのDSLストリーム定義を作成する方法を見てみましょう。

5.4.2. JDBCソースストリームの定義

JDBCソースの主要な構成は、queryおよびupdateです。queryは未読レコードを選択し、updateはフラグを変更して現在のレコードが再読み取りされないようにします。

また、30秒の固定遅延でポーリングし、最大1000行をポーリングするようにJDBCソースを定義します。 最後に、ドライバー、ユーザー名、パスワード、接続URLなどの接続の構成を定義します。

jdbc 
    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'
    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'
    --max-rows-per-poll=1000
    --fixed-delay=30 --time-unit=SECONDS
    --driver-class-name=org.postgresql.Driver
    --url=jdbc:postgresql://localhost:5432/crm
    --username=postgres
    --password=postgres

その他のJDBCソース構成プロパティはここにあります。

5.4.3. 顧客のMongoDBシンクストリームの定義

customer-mongodb-sinkapplication.propertiesで接続構成を定義しなかったため、DSLパラメーターを使用して構成します。

私たちのアプリケーションは完全にに基づいています MongoDataAutoConfiguration。 他の可能な構成を確認できますここ。 基本的に、私たちは定義します spring .data.mongodb.uri

customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4. ストリームを作成してデプロイする

まず、最終的なストリーム定義を作成するには、シェルに戻って次のコマンドを実行します(改行なしで、読みやすくするために挿入されています)。

stream create --name jdbc-to-mongodb 
  --definition "jdbc 
  --query='SELECT id, customer_name FROM public.customer WHERE imported=false' 
  --fixed-delay=30 
  --max-rows-per-poll=1000 
  --update='UPDATE customer SET imported=true WHERE id in (:id)' 
  --time-unit=SECONDS 
  --password=postgres 
  --driver-class-name=org.postgresql.Driver 
  --username=postgres 
  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink 
  --spring.data.mongodb.uri=mongodb://localhost/main"

このストリームDSLは、jdbc -to-mongodbという名前のストリームを定義します。 次に、名前でストリームをデプロイします。

stream deploy --name jdbc-to-mongodb

最後に、ログ出力に使用可能なすべてのログの場所が表示されます。

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. 結論

この記事では、Spring Cloudデータフローを使用したETLデータパイプラインの完全な例を見てきました。

最も注目に値するのは、アプリケーションスターターの構成を確認し、Spring Cloud Data Flow Shellを使用してETLストリームパイプラインを作成し、データの読み取り、変換、書き込み用のカスタムアプリケーションを実装したことです。

いつものように、サンプルコードはGitHubプロジェクトのにあります。