Spring Cloudデータフローを使用したETL
1.概要
Spring Cloud Data Flow
は、リアルタイムのデータパイプラインとバッチプロセスを構築するためのクラウドネイティブのツールキットです。 Spring Cloud Data Flowは、単純なインポート/エクスポート、ETL処理、イベントストリーミング、予測分析など、さまざまなデータ処理のユースケースにすぐに使用できます。
このチュートリアルでは、JDBCデータベースからデータを抽出し、それを単純なPOJOに変換してMongoDBにロードするストリームパイプラインを使用したリアルタイムの変換とロードの抽出(ETL)の例を学習します。
2. ETLとイベントストリーム処理
ETL(抽出、変換、およびロード)は、一般に、複数のデータベースおよびシステムから共通のデータウェアハウスにデータをバッチロードするプロセスと呼ばれていました。このデータウェアハウスでは、システム全体のパフォーマンスを犠牲にすることなく、大量のデータ分析処理を実行することが可能です。
しかし、これがどのように行われるかによって、新しい傾向が変わりつつあります。 ETLには、データウェアハウスやデータレイクへのデータ転送にまだ役割があります。
現在、これはSpring Cloud Data Flow
の助けを借りて
イベントストリームアーキテクチャの** ストリームで行うことができます。
3. Spring Cloudのデータフロー
Spring Cloud Data Flow(SCDF)を使用すると、開発者は2つのフレーバーでデータパイプラインを作成できます。
-
Spring Cloud Streamを使用した長寿命のリアルタイムストリームアプリケーション
-
Spring Cloud Taskを使用した短期間のバッチタスクアプリケーション
この記事では、最初のSpring Cloud Streamをベースにした長寿命のストリーミングアプリケーションについて説明します。
3.1. Spring Cloud Streamアプリケーション
SCDF Streamパイプラインは複数のステップで構成されています。各ステップは、Spring Cloud Streamマイクロフレームワークを使用してSpring Bootスタイルで構築されたアプリケーションです。
これらのアプリケーションは、ソース、プロセッサ、およびシンクに分類されます。
ETLプロセスと比較すると、ソースは「抽出」、プロセッサは「トランス」、シンクは「ロード」部分と言えます。
場合によっては、パイプラインの1つ以上のステップでアプリケーションスターターを使用できます。つまり、ステップのために新しいアプリケーションを実装する必要はありませんが、代わりに、既に実装されている既存のアプリケーションスターターを構成します。
-
アプリケーションスターターのリストはhttps://cloud.spring.io/spring-cloud-stream-app-starters/[here].** にあります。
3.2. Spring Cloudデータフローサーバ
-
アーキテクチャの最後の部分はSpring Cloud Data Flow Server ** です。 SCDFサーバーは、Spring Cloud Deployer仕様を使用してアプリケーションとパイプラインストリームのデプロイを行います。この仕様は、Kubernetes、Apache Mesos、Yarn、およびCloud Foundryなどの一連の最新のランタイムにデプロイすることによって、SCDFクラウド固有のフレーバーをサポートします。
また、ストリームをローカル展開として実行することもできます。
SCDFアーキテクチャの詳細については、https://www.baeldung.com/spring-cloud-data-flow-stream-processing[ここ]を参照してください。
4.環境設定
始める前に、この複雑な展開の部分を選択する必要があります。最初に定義するのはSCDFサーバーです。
テストには、ローカル開発にSCDF Server Localを使用します。実稼働環境では、後でhttps://cloud.spring.io/spring-cloud-dataflow-server-kubernetes/[CDF Server Kubernetes]のようにクラウドネイティブランタイムを選択できます。
here
のサーバーランタイムのリストを見つけることができます。
それでは、このサーバーを実行するためのシステム要件を確認しましょう。
4.1. システム要求
SCDFサーバーを実行するには、2つの依存関係を定義して設定する必要があります。
メッセージングミドルウェア
-
RDBMS
メッセージングミドルウェアについては、RabbitMQを使用します。パイプラインストリームの定義を保存するためのRDBMSとしてPostgreSQLを選択します。
RabbitMQを実行するには、最新バージョンhttps://www.rabbitmq.com/download.html[here]をダウンロードし、デフォルト設定を使用してRabbitMQインスタンスを起動するか、次のDockerコマンドを実行します。
docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
最後のセットアップ手順として、デフォルトポート5432にPostgreSQL RDBMSをインストールして実行します。その後、次のスクリプトを使用してSCDFがそのストリーム定義を格納できるデータベースを作成します。
CREATE DATABASE dataflow;
4.2. Spring Cloud Data Flow Serverローカル
SCDFサーバーローカルを実行するために、サーバーを起動することを選択できます。 -docker[using
docker-compose
、]またはJavaアプリケーションとして起動できます。
-
ここでは、SCDFサーバーローカルをJavaアプリケーションとして実行します** アプリケーションを設定するには、設定をJavaアプリケーションパラメータとして定義する必要があります。システムパスにJava 8が必要です。
jarファイルと依存関係をホストするには、SCDFサーバー用のホームフォルダーを作成し、このフォルダーにSCDFサーバーローカルディストリビューションをダウンロードする必要があります。 SCDFサーバーローカルの最新版https://cloud.spring.io/spring-cloud-dataflow/[here]をダウンロードできます。
最後に、SCDFローカルサーバーを実行しましょう。
$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres__username \
--spring.datasource.password=postgres__password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest
このURLを見て、実行されているかどうかを確認できます。
4.3. Spring Cloudデータフローシェル
SCDFシェルは、アプリケーションとパイプラインを簡単に作成して展開することができるコマンドラインツールです。これらのシェルコマンドは、Spring Cloud Data Flow Server
REST API
を介して実行されます。
最新バージョンのjarをSCDFホームフォルダーにダウンロードします(https://repo.spring.io/release/org/springframework/cloud/spring-cloud-dataflow-shell/)。次のコマンド(必要に応じてバージョンを更新)
$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
________ ________ __ ____
/______| __ ____ __ ____(__)__ ____ ____ __ /______| | ______ __ __ ____| |
\______ \| '__ \| '____| | '__ \/__` | | | | |/__ \| | | |/__` |
______) | |__) | | | | | | | (__| | | |______| | (__) | |__| | (__| |
|________/| .____/|__| |__|__| |__|\____, | \________|__|\______/\____,__|\____,__|
________ |__| __ ____|______/ ____________________
| __ \ ____ __| |__ ____ __ | ______| | __________ ____ \ \ \ \ \ \
| | | |/__` | ____/__` | | |__ | |/__ \ \/\// \ \ \ \ \ \
| |__| | (__| | || (__| | | __| | | (__) \ V V/ ////// |________/\____,__|\____\____,__| |__| |__|\______/\__/\__/ /__/__/__/__/__/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
最後の行に“
dataflow:>”
の代わりに“
server-unknown:>”
が表示される場合は、SCDFサーバーをlocalhostで実行していません。この場合は、次のコマンドを実行して別のホストに接続します。
server-unknown:>dataflow config server http://{host}
これで、シェルはSCDFサーバーに接続され、コマンドを実行できます。
シェルで最初にすべきことは、アプリケーションスターターをインポートすることです。 Spring Boot 2.0.xのRabbitMQ Mavenの最新バージョンhttp://cloud.spring.io/spring-cloud-stream-app-starters/[ここ]を見つけて、次のコマンドを実行します(ここでもバージョンを更新します、ここで) ”
Darwin-SR1
“、必要に応じて):
$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven
インストールされているアプリケーションを確認するには、次のシェルコマンドを実行します。
$ dataflow:> app list
その結果、インストールされているすべてのアプリケーションを含むテーブルが表示されます。
また、SCDFは
Flo
という名前のグラフィカルインタフェースを提供しています。このインタフェースからアクセスできます:
http://localhost:9393/dashboard
。ただし、その使用方法はこの記事の範囲内ではありません。
5. ETLパイプラインを作成する
それでは、ストリームパイプラインを作成しましょう。これを行うために、リレーショナルデータベースから情報を抽出するためにJDBC Sourceアプリケーションスターターを使用します。
また、データ構造を変換するためのカスタムプロセッサと、データをMongoDBにロードするためのカスタムシンクを作成します。
5.1. 抽出 – 抽出のためのリレーショナルデータベースの準備
crm
という名前のデータベースと
customer
という名前のテーブルを作成しましょう。
CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer__name character varying(50),
PRIMARY KEY(id)
)
私たちは
imported
というフラグを使っていることに注意してください。必要ならば、この情報を別のテーブルに格納することもできます。
それでは、データを挿入しましょう。
INSERT INTO customer(id, customer__name, imported) VALUES (1, 'John Doe', false);
5.2. 変換 – JDBCフィールドのMongoDBフィールド構造へのマッピング
変換ステップでは、ソーステーブルからのフィールド
customer
name
を新しいフィールド
name__に簡単に変換します。他の変換もここで行うことができますが、例を短くしましょう。
-
これを行うには、
customer-transform
という名前の新しいプロジェクトを作成します。** これを行う最も簡単な方法は、https://start.spring.io/[Spring Initializr]サイトを使用してプロジェクトを作成することです。 Webサイトにアクセスしたら、グループとアーティファクト名を選択します。それぞれ
com.customer
と__customer-transformを使用します。
これが完了したら、「Generate Project」ボタンをクリックしてプロジェクトをダウンロードします。次に、プロジェクトを解凍して好きなIDEにインポートし、
pom.xml
に次の依存関係を追加します。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
これで、フィールド名変換のコーディングを始める準備が整いました。これを行うには、アダプタとして機能する
Customer
クラスを作成します。このクラスは
setName()
メソッドを介して
customer
name
を受け取り、
getName
メソッドを介してその値を出力します
.
__
__ @ JsonProperty
__アノテーションは、JSONからJavaへのデシリアライズ中に変換を行います。
public class Customer {
private Long id;
private String name;
@JsonProperty("customer__name")
public void setName(String name) {
this.name = name;
}
@JsonProperty("name")
public String getName() {
return name;
}
//Getters and Setters
}
プロセッサは、入力からデータを受け取り、変換を行い、結果を出力チャネルにバインドする必要があります。これを行うためのクラスを作成しましょう。
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {
return payload;
}
}
上記のコードでは、変換が自動的に行われることがわかります。入力はJSONとしてデータを受け取り、Jacksonは
set
メソッドを使用してデータを
Customer
オブジェクトにデシリアライズします。
反対は出力で、データは
get
メソッドを使ってJSONにシリアライズされます。
5.3. 負荷 – MongoDBのシンク
変換ステップと同様に、
customer-mongodb-sink
という名前の別のMavenプロジェクトを作成します。再度https://start.spring.io/[Spring Initializr]にアクセスし、Groupには
com.customer
を選択し、Artifactには
customer-mongodb-sink
を選択します。
次に、依存関係検索ボックスに
“
MongoDB
“
と入力してプロジェクトをダウンロードします。
次に、解凍してお気に入りのIDEにインポートします。
次に、
customer-transform
プロジェクトと同じ追加の依存関係を追加します。
ここで、このステップで入力を受け取るための
Customer
クラスをもう1つ作成します。
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="customer")
public class Customer {
private Long id;
private String name;
//Getters and Setters
}
Customer
をシンクするために、
CustomerRepository
を使用して顧客エンティティを保存するListenerクラスを作成します。
@EnableBinding(Sink.class)
public class CustomerListener {
@Autowired
private CustomerRepository repository;
@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}
この場合の
CustomerRepository
は、Spring Dataの
MongoRepository
です。
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {
}
5.4. ストリーム定義
これで、両方のカスタムアプリケーションをSCDFサーバーに登録する準備ができました。** これを行うには、Mavenコマンド
mvn install
を使用して両方のプロジェクトをコンパイルします。
次に、Spring Cloud Data Flow Shellを使用してそれらを登録します。
app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT
最後に、アプリケーションがSCDFに格納されているかどうかを確認し、シェルでapplication listコマンドを実行します。
app list
その結果、結果の表に両方のアプリケーションが表示されます。
==== 5.4.1。ストリームパイプラインドメイン固有言語 – DSL
DSLは、アプリケーション間の構成とデータフローを定義します。 SCDF DSLは簡単です。最初の単語では、アプリケーションの名前を定義し、その後に設定を続けます。
また、この構文はUnix風のhttps://en.wikipedia.org/wiki/Pipeline__(Unix)[Pipeline構文]であり、複数のアプリケーションを接続するために「パイプ」とも呼ばれる垂直バーを使用します。
http --port=8181 | log
これにより、受信したボディペイロードをログに送信するポート8181で提供されるHTTPアプリケーションが作成されます。
それでは、JDBCソースのDSLストリーム定義を作成する方法を見てみましょう。
5.4.2. JDBCソースストリーム定義
JDBCソースの主な設定は
query
と
update
です。
-
query
は未読レコードを選択し、
update
は現在のレコードが再読されないようにフラグを変更します。
また、JDBCソースを30秒の固定遅延でポーリングし、最大1000行をポーリングするように定義します。最後に、ドライバ、ユーザー名、パスワード、接続URLなどの接続設定を定義します。
jdbc
--query='SELECT id, customer__name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres
その他のJDBCソース設定プロパティはhttps://docs.spring.io/spring-cloud-stream-app-starters/docs/Celsius.SR2/reference/htmlsingle/#spring-cloud-stream-modules-jdbc-sourceにあります。[ここに]。
5.4.3. 顧客MongoDBシンクストリームの定義
customer-mongodb-sink
の
application.properties
で接続構成を定義しなかったので、DSLパラメータを通して構成します。
私たちのアプリケーションは
MongoDataAutoConfiguration.
に完全に基づいています。あなたは他の可能な設定をチェックアウトすることができますfeatures-mongodb[ここ。]基本的に、
spring.data.mongodb.uri
を定義します。
customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main
5.4.4. ストリームを作成してデプロイする
まず、最終的なストリーム定義を作成するには、シェルに戻って次のコマンドを実行します(改行なしで、読みやすくするために挿入されています)。
stream create --name jdbc-to-mongodb
--definition "jdbc
--query='SELECT id, customer__name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"
このストリームDSLはjdbc
_-から-
_
mongodbという名前のストリームを定義します。次に、
ストリームをその名前で展開します
:
stream deploy --name jdbc-to-mongodb
最後に、ログ出力で利用可能なすべてのログの場所を確認します。
Logs will be in {PATH__TO__LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink
Logs will be in {PATH__TO__LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform
Logs will be in {PATH__TO__LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc
6.まとめ
この記事では、Spring Cloud Data Flowを使用したETLデータパイプラインの完全な例を見ました。
最も注目に値するのは、アプリケーションスターターの設定を見て、Spring Cloud Data Flow Shellを使用してETLストリームパイプラインを作成し、データの読み取り、変換、および書き込み用のカスタムアプリケーションを実装したことです。
いつものように、サンプルコードは見つけることができます
https://github.com/eugenp/tutorials/tree/master/spring-cloud-data-flow/etl
[in
GitHubプロジェクト。