1. 序章

給与明細の処理、利息の計算、請求書の生成などのタスクを手動で完了する必要があると想像してください。 それは非常に退屈で、エラーが発生しやすく、手動タスクの終わりのないリストになります!

このチュートリアルでは、Jakarta EEプラットフォームの一部であるJavaバッチ処理( JSR 352 )と、このようなタスクを自動化するための優れた仕様について説明します。 アプリケーション開発者がビジネスロジックに集中できるように、堅牢なバッチ処理システムを開発するためのモデルを提供します。

2. Mavenの依存関係

JSR 352は単なる仕様であるため、[X54X] jberet のように、そのAPI実装を含める必要があります。

<dependency>
    <groupId>javax.batch</groupId>
    <artifactId>javax.batch-api</artifactId>
    <version>1.0.1</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-core</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-support</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-se</artifactId>
    <version>1.0.2.Final</version>
</dependency>

また、メモリ内データベースを追加して、より現実的なシナリオを確認できるようにします。

3. 重要な概念

JSR 352には、次のように見ることができるいくつかの概念が導入されています。

まず、各ピースを定義しましょう。

  • 左側から、JobOperatorがあります。 開始、停止、再開などのジョブ処理のすべての側面を管理します
  • 次に、ジョブがあります。 ジョブは、ステップの論理的なコレクションです。 バッチプロセス全体をカプセル化します
  • ジョブには、1〜n個のStepが含まれます。 各ステップは、独立した連続した作業単位です。 ステップは、読み取り入力、処理その入力、および書き込み出力で構成されます。
  • 最後になりましたが、ジョブの実行情報を格納するJobRepositoryがあります。 ジョブ、その状態、および完了結果を追跡するのに役立ちます

手順にはこれよりも少し詳細があるので、次にそれを見てみましょう。 まず、チャンクの手順を見てから、バッチレットを見ていきます。

4. チャンクの作成

先に述べたように、チャンクは一種のステップですチャンクを使用して、アイテムのセットなど、繰り返し実行される操作を表現することがよくあります。 これは、JavaStreamsの中間操作のようなものです。

チャンクを説明するときは、アイテムをどこから取得するか、どのように処理するか、後でどこに送信するかを表現する必要があります。

4.1. 読書アイテム

アイテムを読むには、ItemReader。を実装する必要があります

この場合、1から10までの数字を単純に出力するリーダーを作成します。

@Named
public class SimpleChunkItemReader extends AbstractItemReader {
    private Integer[] tokens;
    private Integer count;
    
    @Inject
    JobContext jobContext;

    @Override
    public Integer readItem() throws Exception {
        if (count >= tokens.length) { 
            return null;
        }

        jobContext.setTransientUserData(count);
        return tokens[count++];
    }

    @Override
    public void open(Serializable checkpoint) throws Exception {
        tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 };
        count = 0;
    }
}

ここでは、クラスの内部状態から読み取っています。 ただし、もちろん、 readItemは、データベース、ファイルシステム、またはその他の外部ソースからプルできます。

JobContext#setTransientUserData()を使用して、この内部状態の一部を保存していることに注意してください。これは後で役立ちます。

また、チェックポイントパラメータにも注意してください。 それもまた取り上げます。

4.2. アイテムの処理

もちろん、チャンク化する理由は、アイテムに対して何らかの操作を実行したいからです。

アイテムプロセッサからnullを返すときはいつでも、そのアイテムをバッチから削除します。

それで、ここで、偶数だけを保持したいとしましょう。 null を返すことにより、奇数を拒否するItemProcessorを使用できます。

@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
    @Override
    public Integer processItem(Object t) {
        Integer item = (Integer) t;
        return item % 2 == 0 ? item : null;
    }
}

processItem は、ItemReaderが発行するアイテムごとに1回呼び出されます。

4.3. アイテムを書く

最後に、ジョブは ItemWriter を呼び出して、変換されたアイテムを記述できるようにします。

@Named
public class SimpleChunkWriter extends AbstractItemWriter {
    List<Integer> processed = new ArrayList<>();
    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.stream().map(Integer.class::cast).forEach(processed::add);
    }
}

アイテムの長さは?すぐに、チャンクのサイズを定義します。これにより、writeItemsに送信されるリストのサイズが決まります。

4.4. ジョブでチャンクを定義する

ここで、JSLまたはジョブ仕様言語を使用してこれらすべてをXMLファイルにまとめます。 リーダー、プロセッサー、チャンカー、およびチャンクサイズもリストすることに注意してください。

<job id="simpleChunk">
    <step id="firstChunkStep" >
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>    
    </step>
</job>

チャンクサイズは、チャンクの進行状況がジョブリポジトリにコミットされる頻度です。これは、システムの一部に障害が発生した場合に、完了を保証するために重要です。

このファイルは、 META-INF /batch-jobsfor。jarfilesおよびWEB-INF/ classes / META-INF/batch-jobsに配置する必要があります。 .warファイルの場合は

仕事にID“ simpleChunk”、を付けたので、単体テストで試してみましょう。

現在、ジョブは非同期で実行されるため、テストが難しくなります。サンプルでは、ジョブが完了するまでポーリングして待機するBatchTestHelperを確認してください。

@Test
public void givenChunk_thenBatch_completesWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleChunk", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

これがチャンクです。 それでは、バッチレットを見てみましょう。

5. バッチレットの作成

すべてが反復モデルにうまく適合するわけではありません。 たとえば、を1回呼び出し、実行して完了し、終了ステータスを返すだけのタスクがある場合があります。

バッチレットの契約は非常に簡単です。

@Named
public class SimpleBatchLet extends AbstractBatchlet {
 
    @Override
    public String process() throws Exception {
        return BatchStatus.COMPLETED.toString();
    }
}

JSLと同様に:

<job id="simpleBatchLet">
    <step id="firstStep" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

そして、以前と同じアプローチを使用してテストできます。

@Test
public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

そこで、ステップを実装するためのいくつかの異なる方法を見てきました。

次に、マーキングと進行状況の保証のメカニズムを見てみましょう。

6. カスタムチェックポイント

失敗は仕事の途中で必ず起こります。 最初からやり直す必要がありますか、それとも中断したところからやり直すことができますか?

名前が示すように、チェックポイントは、障害が発生した場合に定期的にブックマークを設定するのに役立ちます。

デフォルトでは、チャンク処理の終了は自然なチェックポイントです

ただし、独自のCheckpointAlgorithmを使用してカスタマイズできます。

@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
    
    @Inject
    JobContext jobContext;
    
    @Override
    public boolean isReadyToCheckpoint() throws Exception {
        int counterRead = (Integer) jobContext.getTransientUserData();
        return counterRead % 5 == 0;
    }
}

以前に一時データに入れたカウントを覚えていますか? ここで、 JobContext#getTransientUserData を使用してそれを引き出し、処理された5番目の数値ごとにコミットすることを示すことができます。

これがないと、各チャンクの終わり、この場合は3番目の数字ごとにコミットが発生します。

次に、チャンクの下にあるXMLのcheckout-algorithmディレクティブと一致させます。

<job id="customCheckPoint">
    <step id="firstChunkStep" >
        <chunk item-count="3" checkpoint-policy="custom">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
            <checkpoint-algorithm ref="customCheckPoint"/>
        </chunk>    
    </step>
</job>

コードをテストしてみましょう。ここでも、ボイラープレートの手順の一部がBatchTestHelperに隠されていることに注意してください。

@Test
public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception {
    // ... start job and wait for completion

    jobOperator.getStepExecutions(executionId)
      .stream()
      .map(BatchTestHelper::getCommitCount)
      .forEach(count -> assertEquals(3L, count.longValue()));
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

したがって、10個のアイテムがあり、5番目ごとにコミットを構成しているため、コミット数は2になると予想される場合があります。 ただし、フレームワークは最後にもう1つの最終読み取りコミットを実行し、すべてが処理されたことを確認します。これにより、3になります。

次に、エラーの処理方法を見てみましょう。

7. 例外処理

デフォルトでは、ジョブオペレータは、例外が発生した場合にジョブをFAILEDとしてマークします。

アイテムリーダーを変更して、失敗することを確認しましょう。

@Override
public Integer readItem() throws Exception {
    if (tokens.hasMoreTokens()) {
        String tempTokenize = tokens.nextToken();
        throw new RuntimeException();
    }
    return null;
}

そして、テストします。

@Test
public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception {
    // ... start job and wait for completion
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}

ただし、このデフォルトの動作はいくつかの方法でオーバーライドできます。

  • skip-limit は、このステップが失敗する前に無視する例外の数を指定します
  • try-limit は、ジョブオペレーターが失敗する前にステップを再試行する必要がある回数を指定します
  • skipppable-exception-class は、チャンク処理が無視する一連の例外を指定します

したがって、説明のために、 RuntimeException やその他のいくつかを無視するように、ジョブを編集できます。

<job id="simpleErrorSkipChunk" >
    <step id="errorStep" >
        <chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
            <reader ref="myItemReader"/>
            <processor ref="myItemProcessor"/>
            <writer ref="myItemWriter"/>
            <skippable-exception-classes>
                <include class="java.lang.RuntimeException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </skippable-exception-classes>
            <retryable-exception-classes>
                <include class="java.lang.IllegalArgumentException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </retryable-exception-classes>
        </chunk>
    </step>
</job>

そして今、私たちのコードは合格します:

@Test
public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception {
   // ... start job and wait for completion
   jobOperator.getStepExecutions(executionId).stream()
     .map(BatchTestHelper::getProcessSkipCount)
     .forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
   assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8. 複数のステップの実行

先ほど、ジョブには任意の数のステップを含めることができると述べたので、それを見てみましょう。

8.1. 次のステップの発砲

デフォルトでは、各ステップはジョブの最後のステップです。

バッチジョブ内で次のステップを実行するには、ステップ定義内でnext属性を使用して明示的に指定する必要があります。

<job id="simpleJobSequence">
    <step id="firstChunkStepStep1" next="firstBatchStepStep2">
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>    
    </step>
    <step id="firstBatchStepStep2" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

この属性を忘れると、次のステップは実行されません。

そして、これがAPIでどのように見えるかを見ることができます。

@Test
public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception {
    // ... start job and wait for completion
    assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.2. 流れ

一連のステップは、フローにカプセル化することもできます。 フローが終了すると、実行要素に移行するのはフロー全体です。 また、フロー内の要素はフロー外の要素に移行できません。

たとえば、フロー内で2つのステップを実行してから、そのフローを分離されたステップに移行させることができます。

<job id="flowJobSequence">
    <flow id="flow1" next="firstBatchStepStep3">
        <step id="firstChunkStepStep1" next="firstBatchStepStep2">
            <chunk item-count="3">
	        <reader ref="simpleChunkItemReader" />
		<processor ref="simpleChunkItemProcessor" />
		<writer ref="simpleChunkWriter" />
	    </chunk>
	</step>
	<step id="firstBatchStepStep2">
	    <batchlet ref="simpleBatchLet" />
	</step>
    </flow>
    <step id="firstBatchStepStep3">
	 <batchlet ref="simpleBatchLet" />
    </step>
</job>

そして、各ステップの実行を個別に確認できます。

@Test
public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception {
    // ... start job and wait for completion
 
    assertEquals(3, jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.3. 決定

decisionsの形式でif/elseサポートもあります。 決定はステップ、フロー、および分割の間のシーケンスを決定するカスタマイズされた方法を提供します

手順と同様に、ジョブの実行を指示または終了できるnextなどの遷移要素で機能します。

ジョブを構成する方法を見てみましょう。

<job id="decideJobSequence">
     <step id="firstBatchStepStep1" next="firstDecider">
	 <batchlet ref="simpleBatchLet" />
     </step>	
     <decision id="firstDecider" ref="deciderJobSequence">
        <next on="two" to="firstBatchStepStep2"/>
        <next on="three" to="firstBatchStepStep3"/>
     </decision>
     <step id="firstBatchStepStep2">
	<batchlet ref="simpleBatchLet" />
     </step>	
     <step id="firstBatchStepStep3">
	<batchlet ref="simpleBatchLet" />
     </step>		
</job>

decision 要素は、Deciderを実装するクラスで構成する必要があります。 その仕事は、決定をStringとして返すことです。

決定内の各nextは、スイッチステートメントのケースのようなものです。

8.4. 分割

Splits は、フローを同時に実行できるので便利です。

<job id="splitJobSequence">
   <split id="split1" next="splitJobSequenceStep3">
      <flow id="flow1">
	  <step id="splitJobSequenceStep1">
              <batchlet ref="simpleBatchLet" />
           </step>
      </flow>
      <flow id="flow2">
          <step id="splitJobSequenceStep2">
              <batchlet ref="simpleBatchLet" />
	  </step>
      </flow>
   </split>
   <step id="splitJobSequenceStep3">
      <batchlet ref="simpleBatchLet" />
   </step>
</job>

もちろん、これは注文が保証されないことを意味します

それらがすべて実行されることを確認しましょう。 フローステップは任意の順序で実行されますが、分離されたステップは常に最後になります:

@Test
public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception {
    // ... start job and wait for completion
    List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);

    assertEquals(3, stepExecutions.size());
    assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

9. ジョブの分割

また、ジョブで定義されたJavaコード内のバッチプロパティを使用することもできます。

ジョブ、ステップ、バッチアーティファクトの3つのレベルでスコープを設定できます。

彼らがどのように消費したかのいくつかの例を見てみましょう。

プロパティをジョブレベルで使用する場合:

@Inject
JobContext jobContext;
...
jobProperties = jobContext.getProperties();
...

これは、ステップレベルでも消費できます。

@Inject
StepContext stepContext;
...
stepProperties = stepContext.getProperties();
...

バッチアーティファクトレベルでプロパティを使用する場合:

@Inject
@BatchProperty(name = "name")
private String nameString;

これはパーティションに便利です。

分割を使用すると、フローを同時に実行できます。 ただし、ステップをnセットのアイテムに分割したり、個別の入力を設定したりすることもできます。これにより、作業を複数のスレッドに分割する別の方法が可能になります。[X01X]

各パーティションが実行する必要のある作業のセグメントを理解するために、プロパティをパーティションと組み合わせることができます。

<job id="injectSimpleBatchLet">
    <properties>
        <property name="jobProp1" value="job-value1"/>
    </properties>
    <step id="firstStep">
        <properties>
            <property name="stepProp1" value="value1" />
        </properties>
	<batchlet ref="injectSimpleBatchLet">
	    <properties>
		<property name="name" value="#{partitionPlan['name']}" />
	    </properties>
	</batchlet>
	<partition>
	    <plan partitions="2">
		<properties partition="0">
		    <property name="name" value="firstPartition" />
		</properties>
		<properties partition="1">
		    <property name="name" value="secondPartition" />
		</properties>
	    </plan>
	</partition>
    </step>
</job>

10. 停止して再起動します

さて、これでジョブを定義できます。 それでは、それらの管理について少し話しましょう。

単体テストでは、BatchRuntimeからJobOperatorのインスタンスを取得できることをすでに確認しています。

JobOperator jobOperator = BatchRuntime.getJobOperator();

そして、私たちは仕事を始めることができます:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

ただし、ジョブを停止することもできます。

jobOperator.stop(executionId);

そして最後に、ジョブを再開できます。

executionId = jobOperator.restart(executionId, new Properties());

実行中のジョブを停止する方法を見てみましょう。

@Test
public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobOperator.stop(executionId);
    jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}

また、バッチが STOPPED の場合、再起動できます。

@Test
public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() {
    // ... start and stop the job
 
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
    executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
    jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));
 
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

11. ジョブの取得

バッチジョブが送信されると、バッチランタイムはそれを追跡するためにJobExecutionのインスタンスを作成します

実行IDのJobExecutionを取得するには、 JobOperator#getJobExecution(executionId)メソッドを使用できます。

また、 StepExecutionは、ステップの実行を追跡するための役立つ情報を提供します

実行IDのStepExecutionを取得するには、 JobOperator#getStepExecutions(executionId)メソッドを使用できます。

そして、そこから、 StepExecution#getMetrics:を介して、ステップに関するいくつかのメトリックを取得できます。

@Test
public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception {
    // ... start job and wait for completion
    assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
    assertTrue(jobOperator.getParameters(executionId).isEmpty());
    StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0);
    Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
    assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue());
    assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue());
    // ... and many more!
}

12. 短所

JSR 352は強力ですが、いくつかの領域に欠けています。

  • JSONなどの他の形式を処理できるリーダーとライターが不足しているようです
  • ジェネリックのサポートはありません
  • パーティショニングは単一のステップのみをサポートします
  • APIは、スケジューリングをサポートするものを何も提供していません(ただし、J2EEには別個のスケジューリングモジュールがあります)
  • 非同期であるため、テストは困難な場合があります
  • APIは非常に冗長です

13. 結論

この記事では、JSR 352を見て、チャンク、バッチレット、スプリット、フローなどについて学びました。 それでも、表面をほとんど傷つけていません。

いつものように、デモコードはGitHubにあります。