SpringCloudデータフローを使用したバッチ処理
1. 概要
シリーズの最初のの記事では、 Spring Cloud Data Flow のアーキテクチャコンポーネントと、それを使用してストリーミングデータパイプラインを作成する方法を紹介しました。
無制限の量のデータが処理されるストリームパイプラインとは対照的に、バッチプロセスにより、タスクがオンデマンドで実行される短期間のサービスを簡単に作成できます。
2. ローカルデータフローサーバーとシェル
ローカルデータフローサーバーはアプリケーションの展開を担当するコンポーネントであり、データフローシェルを使用すると、
前の記事では、 Spring Initilizr を使用して、両方をSpringBootアプリケーションとして設定しました。
@EnableDataFlowServer アノテーションをサーバーのメインクラスに追加し、 @ EnableDataFlowShell アノテーションをシェルのメインクラスにそれぞれ追加すると、準備が整います。実行することによって起動されます:
mvn spring-boot:run
サーバーはポート9393で起動し、シェルはプロンプトからサーバーと対話する準備が整います。
ローカルデータフローサーバーとそのシェルクライアントを取得して使用する方法の詳細については、前の記事を参照してください。
3. バッチアプリケーション
サーバーとシェルの場合と同様に、 Spring Initilizr を使用して、ルートSpringBootバッチアプリケーションをセットアップできます。
ウェブサイトにアクセスしたら、グループ、アーティファクトの名前を選択し、依存関係の検索ボックスからクラウドタスクを選択します。
これが完了したら、プロジェクトの生成ボタンをクリックして、Mavenアーティファクトのダウンロードを開始します。
アーティファクトは事前構成されており、基本的なコードが付属しています。 バッチアプリケーションを構築するためにそれを編集する方法を見てみましょう。
3.1. Mavenの依存関係
まず、Mavenの依存関係をいくつか追加しましょう。 これはバッチアプリケーションであるため、 Spring BatchProjectからライブラリをインポートする必要があります。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
また、Spring Cloud Taskはリレーショナルデータベースを使用して実行されたタスクの結果を格納するため、RDBMSドライバーに依存関係を追加する必要があります。
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
Springが提供するH2インメモリデータベースを使用することを選択しました。 これにより、開発をブートストラップする簡単な方法が得られます。 ただし、実稼働環境では、独自のDataSourceを構成する必要があります。
アーティファクトのバージョンは、Spring Bootの親pom.xmlファイルから継承されることに注意してください。
3.2. メインクラス
目的の機能を有効にするための重要なポイントは、@EnableTaskおよび@EnableBatchProcessingアノテーションをSpring Bootのメインクラスに追加することです。 このクラスレベルのアノテーションは、SpringCloudTaskにすべてをブートストラップするように指示します。
@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchJobApplication {
public static void main(String[] args) {
SpringApplication.run(BatchJobApplication.class, args);
}
}
3.3. ジョブ構成
最後に、ジョブを構成しましょう。この場合は、Stringをログファイルに簡単に出力します。
@Configuration
public class JobConfiguration {
private static Log logger
= LogFactory.getLog(JobConfiguration.class);
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory.get("jobStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
logger.info("Job was run");
return RepeatStatus.FINISHED;
}
}).build()).build();
}
}
これで、アプリケーションの準備が整いました。 ローカルのMavenリポジトリ内にインストールしましょう。 これを行うには、 cd をプロジェクトのルートディレクトリに挿入し、次のコマンドを発行します。
mvn clean install
次に、アプリケーションをデータフローサーバー内に配置します。
4. アプリケーションの登録
アプリケーションをAppRegistryに登録するには、一意の名前、アプリケーションタイプ、およびアプリアーティファクトに解決できるURIを指定する必要があります。
Spring Cloud Data Flow Shell に移動し、プロンプトからコマンドを発行します。
app register --name batch-job --type task
--uri maven://com.baeldung.spring.cloud:batch-job:jar:0.0.1-SNAPSHOT
5. タスクの作成
タスク定義は、次のコマンドを使用して作成できます。
task create myjob --definition batch-job
これにより、以前に登録されたバッチジョブアプリケーションを指すmyjobという名前の新しいタスクが作成されます。
現在のタスク定義のリストは、次のコマンドを使用して取得できます。
task list
6. タスクの起動
タスクを起動するには、次のコマンドを使用できます。
task launch myjob
タスクが起動されると、タスクの状態がリレーショナルDBに保存されます。 次のコマンドを使用して、タスクの実行ステータスを確認できます。
task execution list
7. 結果の確認
この例では、ジョブは単にログファイルに文字列を出力します。 ログファイルは、 Data FlowServerのログ出力に表示されるディレクトリ内にあります。
結果を確認するには、ログを調整します。
tail -f PATH_TO_LOG\spring-cloud-dataflow-2385233467298102321\myjob-1472827120414\myjob
[...] --- [main] o.s.batch.core.job.SimpleStepHandler: Executing step: [jobStep1]
[...] --- [main] o.b.spring.cloud.JobConfiguration: Job was run
[...] --- [main] o.s.b.c.l.support.SimpleJobLauncher:
Job: [SimpleJob: [name=job]] completed with the following parameters:
[{}] and the following status: [COMPLETED]
8. 結論
この記事では、 Spring Cloud DataFlowを使用してバッチ処理を処理する方法を示しました。
サンプルコードはGitHubプロジェクトにあります。