Java EE 7バッチ処理
1前書き
給与明細の処理、金利の計算、請求書の作成などのタスクを手動で完了しなければならなかったとします。それはかなり退屈で、エラーを起こしやすく、そして終わりのない手作業のリストになるでしょう!
このチュートリアルでは、Java Batch Processing(https://jcp.org/en/jsr/detail?id=352[JSR 352])、Jakarta EEプラットフォームの一部、および優れた仕様について説明します。このような作業を自動化するためのものです。
ビジネスロジックに集中できるように、堅牢なバッチ処理システムを開発するモデルをアプリケーション開発者に提供します。
2. Mavenの依存関係
JSR 352は単なる仕様なので、https://search.maven.org/artifact/javax.batch/javax.batch-api[API]とhttps://search.maven.orgを含める必要があります。
jberet
:のように/search?q=org.jberet[implementation]。
<dependency>
<groupId>javax.batch</groupId>
<artifactId>javax.batch-api</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-core</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-support</artifactId>
<version>1.0.2.Final</version>
</dependency>
<dependency>
<groupId>org.jberet</groupId>
<artifactId>jberet-se</artifactId>
<version>1.0.2.Final</version>
</dependency>
より現実的なシナリオをいくつか見ることができるように、インメモリデータベースも追加します。
3主な概念
JSR 352ではいくつかの概念が導入されていますが、それをこのように見ることができます。
まず各部分を定義しましょう。
-
左側から始めて、
JobOperator
があります。すべて管理**
開始、停止、再開などのジョブ処理の側面
次に、
Job
があります。ジョブはステップの論理的な集まりです。それ
バッチプロセス全体をカプセル化します
** ジョブは1からnまでの
__Step
__を含みます。各ステップは
独立した順次作業単位。ステップが構成されている
入力
入力
入力
処理
出力
書き込み
** そして最後になりましたが、
__JobRepository
__which store
ジョブの実行情報それは仕事、彼らの状態、そして彼らの完了結果を追跡するのに役立ちます
ステップはこれよりもう少し詳細なので、次にそれを見てみましょう。最初に、
Chunk
ステップを見てから、
Batchlet __sを見てみましょう。
4チャンクを作成する
前述したように、チャンクは一種のステップです
__.
__私達はチャンクを使用して何度も何度も実行される操作を表現します。 Java Streamsからの中間操作のようなものです。
チャンクについて説明するときは、どこからアイテムを取り出すのか、それらをどのように処理するのか、そしてどこに後でそれらを送信するのかを表現する必要があります。
4.1. 読み物
-
アイテムを読むためには、__ItemReaderを実装する必要があります。
この場合は、1から10までの数字を出力するリーダーを作成します。
@Named
public class SimpleChunkItemReader extends AbstractItemReader {
private Integer[]tokens;
private Integer count;
@Inject
JobContext jobContext;
@Override
public Integer readItem() throws Exception {
if (count >= tokens.length) {
return null;
}
jobContext.setTransientUserData(count);
return tokens[count++];
}
@Override
public void open(Serializable checkpoint) throws Exception {
tokens = new Integer[]{ 1,2,3,4,5,6,7,8,9,10 };
count = 0;
}
}
さて、ここではクラスの内部状態から読んでいるだけです。しかし、もちろん、**
__ readItem
__はデータベースから、ファイルシステムから、あるいは他の外部ソースから取得することができます。
JobContext#setTransientUserData()
を使用してこの内部状態の一部を保存しています。これは後で役に立ちます。
-
__checkpoint
__parameter ** にも注意してください。私たちもまたそれを取り上げるつもりです。
4.2. 処理アイテム
もちろん、私たちがチャンクしているのは、私たちのアイテムに対してある種の操作を実行したいからです。
アイテムプロセッサから
null
が返されるたびに、そのアイテムをバッチから削除します。
それで、ここで、偶数だけを残したいとしましょう。
null
を返すことで、奇数のものを拒否する
ItemProcessor
を使用できます。
@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
@Override
public Integer processItem(Object t) {
Integer item = (Integer) t;
return item % 2 == 0 ? item : null;
}
}
-
ItemReader
が発行するアイテムごとに
processItem
が1回呼び出されます。
4.3. アイテムを書く
最後に、ジョブは
ItemWriter
を呼び出すので、変換されたアイテムを書き込めます。
@Named
public class SimpleChunkWriter extends AbstractItemWriter {
List<Integer> processed = new ArrayList<>();
@Override
public void writeItems(List<Object> items) throws Exception {
items.stream().map(Integer.class::cast).forEach(processed::add);
}
}
-
items
の長さは?** しばらくして、チャンクのサイズを定義します。これにより、
writeItems
に送信されるリストのサイズが決まります。
4.4. ジョブにチャンクを定義する
JSLまたはJob Specification Languageを使用して、これらすべてをXMLファイルにまとめました。読者、プロセッサ、チャンク、そしてチャンクサイズもリストします。
<job id="simpleChunk">
<step id="firstChunkStep" >
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
</job>
-
チャンクサイズは、チャンクの進行がジョブリポジトリにコミットされる頻度** です。これは、システムの一部に障害が発生した場合の完了を保証するために重要です。
このファイルは、.
__ jarファイルについては
META-INF/batch-jobs
に、
.war
ファイルについては
WEB-INF/classes/META-INF/batch-jobs__に配置する必要があります
私たちは自分の仕事にid “simpleChunk”を与えました。
-
これで、ジョブは非同期的に実行されるため、テストが難しくなります。** サンプルでは、ジョブが完了するまでポーリングして待機する
__BatchTestHelper
__を必ずチェックしてください。
@Test
public void givenChunk__thenBatch__completesWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleChunk", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
それで、チャンクは何ですか。それでは、バッチレットを見てみましょう。
5.バッチレットを作成する
すべてが反復モデルにうまく適合するわけではありません。たとえば、1回だけ呼び出し、最後まで実行して終了ステータスを返すだけのタスクがあるかもしれません。
バッチレットの契約は非常に簡単です。
@Named
public class SimpleBatchLet extends AbstractBatchlet {
@Override
public String process() throws Exception {
return BatchStatus.COMPLETED.toString();
}
}
JSLと同様に、
<job id="simpleBatchLet">
<step id="firstStep" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>
そして以前と同じ方法でテストすることができます。
@Test
public void givenBatchlet__thenBatch__completeWithSuccess() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
そこで、ステップを実装するためのいくつかの異なる方法を見ました。
それでは、進歩を記録し保証するためのメカニズムを見てみましょう。
6. カスタムチェックポイント
失敗は仕事の途中で起きることになっています。 ** すべてをやり直すだけでいいのでしょうか、それとも、中断したところから始めてもいいですか?
名前が示すように、
checkpoints
は失敗した場合に定期的にブックマークを設定するのに役立ちます。
-
デフォルトでは、チャンク処理の終わりは自然なチェックポイントです** 。
ただし、独自の
CheckpointAlgorithm
を使用してカスタマイズできます。
@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
@Inject
JobContext jobContext;
@Override
public boolean isReadyToCheckpoint() throws Exception {
int counterRead = (Integer) jobContext.getTransientUserData();
return counterRead % 5 == 0;
}
}
以前に一時データに入れたカウントを覚えていますか?ここで、処理された5番目の番号ごとにコミットしたいことを示すために、
__
JobContext#getTransientUserData
__を使用してそれを引き出すことができます。
これがなければ、コミットは各チャンクの終わり、あるいは私たちの場合では3番目の数字ごとに起こります。
-
そして、それをXMLの
__checkout-algorithm
__directiveとチャンクの下に合わせます。
<job id="customCheckPoint">
<step id="firstChunkStep" >
<chunk item-count="3" checkpoint-policy="custom">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
<checkpoint-algorithm ref="customCheckPoint"/>
</chunk>
</step>
</job>
コードをテストしましょう。ここでも、いくつかの定型的なステップが
BatchTestHelper
に隠されていることに注意してください。
@Test
public void givenChunk__whenCustomCheckPoint__thenCommitCountIsThree() throws Exception {
//... start job and wait for completion
jobOperator.getStepExecutions(executionId)
.stream()
.map(BatchTestHelper::getCommitCount)
.forEach(count -> assertEquals(3L, count.longValue()));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
そのため、10個の項目があり、5項目ごとになるようにコミットを設定しているので、コミット数は2になると予想されるかもしれません。しかし、
フレームワークは、最後にもう1つ最終コミットを行い
すべてが処理されたことを確認します。これが、3になります。
次に、エラーを処理する方法を見てみましょう。
7. 例外処理
デフォルトでは、** ジョブオペレータは例外の場合にジョブを____FAILEDとマークします
失敗したことを確認するためにアイテムリーダーを変更しましょう。
@Override
public Integer readItem() throws Exception {
if (tokens.hasMoreTokens()) {
String tempTokenize = tokens.nextToken();
throw new RuntimeException();
}
return null;
}
そしてテストします。
@Test
public void whenChunkError__thenBatch__CompletesWithFailed() throws Exception {
//... start job and wait for completion
assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}
しかし、このデフォルトの振る舞いをいくつかの方法でオーバーライドすることができます。
-
このステップまでに無視する例外の数を指定します
失敗する
**
__retry-limit
__は、ジョブオペレータが行うべき回数を指定します
失敗する前にステップを再試行してください
** チャンク処理が無視する例外のセットを指定します
そのため、説明のためだけに、
RuntimeException
とその他のいくつかを無視するようにジョブを編集できます。
<job id="simpleErrorSkipChunk" >
<step id="errorStep" >
<chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
<reader ref="myItemReader"/>
<processor ref="myItemProcessor"/>
<writer ref="myItemWriter"/>
<skippable-exception-classes>
<include class="java.lang.RuntimeException"/>
<include class="java.lang.UnsupportedOperationException"/>
</skippable-exception-classes>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
<include class="java.lang.UnsupportedOperationException"/>
</retryable-exception-classes>
</chunk>
</step>
</job>
そして今、私たちのコードは渡します:
@Test
public void givenChunkError__thenErrorSkipped__CompletesWithSuccess() throws Exception {
//... start job and wait for completion
jobOperator.getStepExecutions(executionId).stream()
.map(BatchTestHelper::getProcessSkipCount)
.forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
8複数のステップを実行する
先に述べたように、仕事はいくつでもステップを踏むことができるので、今それを見ましょう。
8.1. 次のステップへ
デフォルトでは、
各ステップはジョブの最後のステップです
。
バッチジョブ内で次のステップを実行するには、ステップ定義内で
next
属性を使用して明示的に指定する必要があります。
<job id="simpleJobSequence">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
<step id="firstBatchStepStep2" >
<batchlet ref="simpleBatchLet"/>
</step>
</job>
この属性を忘れると、次のステップは実行されません。
そして、これがAPIでどのように見えるかを見ることができます。
@Test
public void givenTwoSteps__thenBatch__CompleteWithSuccess() throws Exception {
//... start job and wait for completion
assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
8.2. 流れ
一連のステップを
flow
にカプセル化することもできます。
フローが終了すると、実行要素
に遷移するのはフロー全体です。また、フロー内の要素はフロー外の要素には移行できません。
たとえば、フロー内で2つのステップを実行してから、そのフローを独立したステップに移行させることができます。
<job id="flowJobSequence">
<flow id="flow1" next="firstBatchStepStep3">
<step id="firstChunkStepStep1" next="firstBatchStepStep2">
<chunk item-count="3">
<reader ref="simpleChunkItemReader"/>
<processor ref="simpleChunkItemProcessor"/>
<writer ref="simpleChunkWriter"/>
</chunk>
</step>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet"/>
</step>
</flow>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet"/>
</step>
</job>
それでも、各ステップの実行は独立しています。
@Test
public void givenFlow__thenBatch__CompleteWithSuccess() throws Exception {
//... start job and wait for completion
assertEquals(3, jobOperator.getStepExecutions(executionId).size());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
8.3. 決定事項
if/elseが
decisions
の形でサポートされています。
決定は、
ステップ、フロー、および分割
の間の順序を決定するためのカスタマイズされた方法を提供します。
ステップと同様に、ジョブの実行を指示または終了できる
next
などの遷移要素に対して機能します。
ジョブを設定する方法を見てみましょう。
<job id="decideJobSequence">
<step id="firstBatchStepStep1" next="firstDecider">
<batchlet ref="simpleBatchLet"/>
</step>
<decision id="firstDecider" ref="deciderJobSequence">
<next on="two" to="firstBatchStepStep2"/>
<next on="three" to="firstBatchStepStep3"/>
</decision>
<step id="firstBatchStepStep2">
<batchlet ref="simpleBatchLet"/>
</step>
<step id="firstBatchStepStep3">
<batchlet ref="simpleBatchLet"/>
</step>
</job>
decision
要素はすべて、
Decider
を実装するクラスで構成する必要があります。その仕事は
String
として決定を返すことです。
decision
内のそれぞれの
next
は、
switch-
switchステートメントの中の
case
のようなものです。
8.4. 分割
Splits
はフローを同時に実行できるので便利です。
<job id="splitJobSequence">
<split id="split1" next="splitJobSequenceStep3">
<flow id="flow1">
<step id="splitJobSequenceStep1">
<batchlet ref="simpleBatchLet"/>
</step>
</flow>
<flow id="flow2">
<step id="splitJobSequenceStep2">
<batchlet ref="simpleBatchLet"/>
</step>
</flow>
</split>
<step id="splitJobSequenceStep3">
<batchlet ref="simpleBatchLet"/>
</step>
</job>
もちろん、これは順序が保証されないことを意味します。
まだ実行されていることを確認しましょう。
フローステップは任意の順序で実行されますが、分離されたステップは常に最後になります。
@Test
public void givenSplit__thenBatch__CompletesWithSuccess() throws Exception {
//... start job and wait for completion
List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
assertEquals(3, stepExecutions.size());
assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName());
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
** 9ジョブの分割
私たちの仕事で定義されている私たちのJavaコード内のバッチプロパティも消費することができます。
-
それらは、仕事、ステップ、そしてバッチアーティファクトの3つのレベルでスコープ指定できます。
それらがどのように消費したかの例をいくつか見てみましょう。
ジョブレベルでプロパティを消費したい場合:
@Inject
JobContext jobContext;
...
jobProperties = jobContext.getProperties();
...
これはステップレベルでも消費できます。
@Inject
StepContext stepContext;
...
stepProperties = stepContext.getProperties();
...
バッチ成果物レベルでプロパティーを消費したい場合は、次のようにします。
@Inject
@BatchProperty(name = "name")
private String nameString;
これはパーティションに便利です。
分割を使うと、フローを同時に実行できることがわかります。 ** しかし、アイテムを____n個のセットに分割したり、別々の入力を設定したりすることで、作業を複数のスレッドに分割することもできます。
各パーティションが行うべき作業のセグメントを理解するために、プロパティとパーティションを組み合わせることができます。
<job id="injectSimpleBatchLet">
<properties>
<property name="jobProp1" value="job-value1"/>
</properties>
<step id="firstStep">
<properties>
<property name="stepProp1" value="value1"/>
</properties>
<batchlet ref="injectSimpleBatchLet">
<properties>
<property name="name" value="#{partitionPlan['name']}"/>
</properties>
</batchlet>
<partition>
<plan partitions="2">
<properties partition="0">
<property name="name" value="firstPartition"/>
</properties>
<properties partition="1">
<property name="name" value="secondPartition"/>
</properties>
</plan>
</partition>
</step>
</job>
10停止して再起動
今、それは仕事を定義するためのものです。それでは、それらの管理について少し説明しましょう。
__JobOperator
from
BatchRuntime__のインスタンスを取得できることは、私たちのユニットテストで既に見てきました。
JobOperator jobOperator = BatchRuntime.getJobOperator();
そして、私たちは仕事を始めることができます。
Long executionId = jobOperator.start("simpleBatchlet", new Properties());
しかし、仕事をやめることもできます。
jobOperator.stop(executionId);
そして最後に、仕事を再開できます。
executionId = jobOperator.restart(executionId, new Properties());
実行中の仕事を停止する方法を見てみましょう。
@Test
public void givenBatchLetStarted__whenStopped__thenBatchStopped() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
Long executionId = jobOperator.start("simpleBatchLet", new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
jobOperator.stop(executionId);
jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}
そして、バッチが
STOPPED
の場合、それを再開できます。
@Test
public void givenBatchLetStopped__whenRestarted__thenBatchCompletesSuccess() {
//... start and stop the job
assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));
assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}
11求人を取得する
バッチジョブが投入されると、バッチランタイムはそれを追跡するために
JobExecution
のインスタンスを作成します。
実行IDの
JobExecution
を取得するには、
JobOperator#getJobExecution(executionId)
メソッドを使用できます。
また、
StepExecution
は、ステップの実行を追跡するのに役立つ情報を提供します
。
実行IDの
StepExecution
を取得するには、
JobOperator#getStepExecutions(executionId)
メソッドを使用できます。
それから、
StepExecution#getMetrics:
を介してステップに関するhttps://docs.oracle.com/javaee/7/api/javax/batch/runtime/Metric.MetricType.html[いくつかのメトリック]を取得できます。
@Test
public void givenChunk__whenJobStarts__thenStepsHaveMetrics() throws Exception {
//... start job and wait for completion
assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
assertTrue(jobOperator.getParameters(executionId).isEmpty());
StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0);
Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
assertEquals(10L, metricTest.get(Metric.MetricType.READ__COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.FILTER__COUNT).longValue());
assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT__COUNT).longValue());
assertEquals(5L, metricTest.get(Metric.MetricType.WRITE__COUNT).longValue());
//... and many more!
}
12. デメリット
JSR 352は強力ですが、いくつかの分野で欠けています。
-
他の人を処理することができる読者と作家の不足があるようです
JSONなどの形式
** ジェネリック医薬品のサポートはありません
-
パーティショニングはシングルステップのみをサポートします
-
APIはスケジューリングをサポートするものを提供していません(J2EEにはありますが)。
別のスケジューリングモジュール)
** その非同期的な性質のために、テストは難題かもしれません
-
APIはかなり冗長です
13. 結論
この記事では、JSR 352を見て、チャンク、バッチレット、分割、フローなどについて学びました。それでも、表面にはほとんど傷がありません。
いつものようにデモコードはhttps://github.com/eugenp/tutorials/tree/master/jee-7[over GitHub]にあります。