1前書き

給与明細の処理、金利の計算、請求書の作成などのタスクを手動で完了しなければならなかったとします。それはかなり退屈で、エラーを起こしやすく、そして終わりのない手作業のリストになるでしょう!

このチュートリアルでは、Java Batch Processing(https://jcp.org/en/jsr/detail?id=352[JSR 352])、Jakarta EEプラットフォームの一部、および優れた仕様について説明します。このような作業を自動化するためのものです。

ビジネスロジックに集中できるように、堅牢なバッチ処理システムを開発するモデルをアプリケーション開発者に提供します。

2. Mavenの依存関係

JSR 352は単なる仕様なので、https://search.maven.org/artifact/javax.batch/javax.batch-api[API]とhttps://search.maven.orgを含める必要があります。


jberet


:のように/search?q=org.jberet[implementation]。

<dependency>
    <groupId>javax.batch</groupId>
    <artifactId>javax.batch-api</artifactId>
    <version>1.0.1</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-core</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-support</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-se</artifactId>
    <version>1.0.2.Final</version>
</dependency>

より現実的なシナリオをいくつか見ることができるように、インメモリデータベースも追加します。


3主な概念

JSR 352ではいくつかの概念が導入されていますが、それをこのように見ることができます。

まず各部分を定義しましょう。

  • 左側から始めて、

    JobOperator

    があります。すべて管理**

開始、停止、再開などのジョブ処理の側面


次に、

Job

があります。ジョブはステップの論理的な集まりです。それ

バッチプロセス全体をカプセル化します
** ジョブは1からnまでの

__Step

__を含みます。各ステップは

独立した順次作業単位。ステップが構成されている

入力

入力

入力

処理

出力

書き込み
** そして最後になりましたが、

__JobRepository

__which store

ジョブの実行情報それは仕事、彼らの状態、そして彼らの完了結果を追跡するのに役立ちます

ステップはこれよりもう少し詳細なので、次にそれを見てみましょう。最初に、

Chunk

ステップを見てから、


Batchlet __sを見てみましょう。


4チャンクを作成する

前述したように、チャンクは一種のステップです

__.

__私達はチャンクを使用して何度も何度も実行される操作を表現します。 Java Streamsからの中間操作のようなものです。

チャンクについて説明するときは、どこからアイテムを取り出すのか、それらをどのように処理するのか、そしてどこに後でそれらを送信するのかを表現する必要があります。

4.1. 読み物

  • アイテムを読むためには、__ItemReaderを実装する必要があります。

この場合は、1から10までの数字を出力するリーダーを作成します。

@Named
public class SimpleChunkItemReader extends AbstractItemReader {
    private Integer[]tokens;
    private Integer count;

    @Inject
    JobContext jobContext;

    @Override
    public Integer readItem() throws Exception {
        if (count >= tokens.length) {
            return null;
        }

        jobContext.setTransientUserData(count);
        return tokens[count++];
    }

    @Override
    public void open(Serializable checkpoint) throws Exception {
        tokens = new Integer[]{ 1,2,3,4,5,6,7,8,9,10 };
        count = 0;
    }
}

さて、ここではクラスの内部状態から読んでいるだけです。しかし、もちろん、**

__ readItem

__はデータベースから、ファイルシステムから、あるいは他の外部ソースから取得することができます。


JobContext#setTransientUserData()

を使用してこの内部状態の一部を保存しています。これは後で役に立ちます。


  • __checkpoint

    __parameter ** にも注意してください。私たちもまたそれを取り上げるつもりです。

4.2. 処理アイテム

もちろん、私たちがチャンクしているのは、私たちのアイテムに対してある種の操作を実行したいからです。

アイテムプロセッサから

null

が返されるたびに、そのアイテムをバッチから削除します。

それで、ここで、偶数だけを残したいとしましょう。

null

を返すことで、奇数のものを拒否する

ItemProcessor

を使用できます。

@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
    @Override
    public Integer processItem(Object t) {
        Integer item = (Integer) t;
        return item % 2 == 0 ? item : null;
    }
}


  • ItemReader

    が発行するアイテムごとに

    processItem

    が1回呼び出されます。

4.3. アイテムを書く

最後に、ジョブは

ItemWriter

を呼び出すので、変換されたアイテムを書き込めます。

@Named
public class SimpleChunkWriter extends AbstractItemWriter {
    List<Integer> processed = new ArrayList<>();
    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.stream().map(Integer.class::cast).forEach(processed::add);
    }
}


  • items

    の長さは?** しばらくして、チャンクのサイズを定義します。これにより、

    writeItems

    に送信されるリストのサイズが決まります。

4.4. ジョブにチャンクを定義する

JSLまたはJob Specification Languageを使用して、これらすべてをXMLファイルにまとめました。読者、プロセッサ、チャンク、そしてチャンクサイズもリストします。

<job id="simpleChunk">
    <step id="firstChunkStep" >
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>
    </step>
</job>

  • チャンクサイズは、チャンクの進行がジョブリポジトリにコミットされる頻度** です。これは、システムの一部に障害が発生した場合の完了を保証するために重要です。

このファイルは、.

__ jarファイルについては

META-INF/batch-jobs

に、

.war

ファイルについては

WEB-INF/classes/META-INF/batch-jobs__に配置する必要があります

私たちは自分の仕事にid “simpleChunk”を与えました。

  • これで、ジョブは非同期的に実行されるため、テストが難しくなります。** サンプルでは、​​ジョブが完了するまでポーリングして待機する

    __BatchTestHelper

    __を必ずチェックしてください。

@Test
public void givenChunk__thenBatch__completesWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleChunk", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

それで、チャンクは何ですか。それでは、バッチレットを見てみましょう。

5.バッチレットを作成する

すべてが反復モデ​​ルにうまく適合するわけではありません。たとえば、1回だけ呼び出し、最後まで実行して終了ステータスを返すだけのタスクがあるかもしれません。

バッチレットの契約は非常に簡単です。

@Named
public class SimpleBatchLet extends AbstractBatchlet {

    @Override
    public String process() throws Exception {
        return BatchStatus.COMPLETED.toString();
    }
}

JSLと同様に、

<job id="simpleBatchLet">
    <step id="firstStep" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

そして以前と同じ方法でテストすることができます。

@Test
public void givenBatchlet__thenBatch__completeWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

そこで、ステップを実装するためのいくつかの異なる方法を見ました。

それでは、進歩を記録し保証するためのメカニズムを見てみましょう。


6. カスタムチェックポイント

失敗は仕事の途中で起きることになっています。 ** すべてをやり直すだけでいいのでしょうか、それとも、中断したところから始めてもいいですか?

名前が示すように、

checkpoints

は失敗した場合に定期的にブックマークを設定するのに役立ちます。

  • デフォルトでは、チャンク処理の終わりは自然なチェックポイントです** 。

ただし、独自の

CheckpointAlgorithm

を使用してカスタマイズできます。

@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {

    @Inject
    JobContext jobContext;

    @Override
    public boolean isReadyToCheckpoint() throws Exception {
        int counterRead = (Integer) jobContext.getTransientUserData();
        return counterRead % 5 == 0;
    }
}

以前に一時データに入れたカウントを覚えていますか?ここで、処理された5番目の番号ごとにコミットしたいことを示すために、



__

JobContext#getTransientUserData


__を使用してそれを引き出すことができます。

これがなければ、コミットは各チャンクの終わり、あるいは私たちの場合では3番目の数字ごとに起こります。

  • そして、それをXMLの

    __checkout-algorithm

    __directiveとチャンクの下に合わせます。

<job id="customCheckPoint">
    <step id="firstChunkStep" >
        <chunk item-count="3" checkpoint-policy="custom">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
            <checkpoint-algorithm ref="customCheckPoint"/>
        </chunk>
    </step>
</job>

コードをテストしましょう。ここでも、いくつかの定型的なステップが

BatchTestHelper

に隠されていることに注意してください。

@Test
public void givenChunk__whenCustomCheckPoint__thenCommitCountIsThree() throws Exception {
   //... start job and wait for completion

    jobOperator.getStepExecutions(executionId)
      .stream()
      .map(BatchTestHelper::getCommitCount)
      .forEach(count -> assertEquals(3L, count.longValue()));
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

そのため、10個の項目があり、5項目ごとになるようにコミットを設定しているので、コミット数は2になると予想されるかもしれません。しかし、

フレームワークは、最後にもう1つ最終コミットを行い

すべてが処理されたことを確認します。これが、3になります。

次に、エラーを処理する方法を見てみましょう。


7. 例外処理

デフォルトでは、** ジョブオペレータは例外の場合にジョブを____FAILEDとマークします

失敗したことを確認するためにアイテムリーダーを変更しましょう。

@Override
public Integer readItem() throws Exception {
    if (tokens.hasMoreTokens()) {
        String tempTokenize = tokens.nextToken();
        throw new RuntimeException();
    }
    return null;
}

そしてテストします。

@Test
public void whenChunkError__thenBatch__CompletesWithFailed() throws Exception {
   //... start job and wait for completion
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}

しかし、このデフォルトの振る舞いをいくつかの方法でオーバーライドすることができます。

  • このステップまでに無視する例外の数を指定します

失敗する
**

__retry-limit

__は、ジョブオペレータが行うべき回数を指定します

失敗する前にステップを再試行してください
** チャンク処理が無視する例外のセットを指定します

そのため、説明のためだけに、

RuntimeException

とその他のいくつかを無視するようにジョブを編集できます。

<job id="simpleErrorSkipChunk" >
    <step id="errorStep" >
        <chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
            <reader ref="myItemReader"/>
            <processor ref="myItemProcessor"/>
            <writer ref="myItemWriter"/>
            <skippable-exception-classes>
                <include class="java.lang.RuntimeException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </skippable-exception-classes>
            <retryable-exception-classes>
                <include class="java.lang.IllegalArgumentException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </retryable-exception-classes>
        </chunk>
    </step>
</job>

そして今、私たちのコードは渡します:

@Test
public void givenChunkError__thenErrorSkipped__CompletesWithSuccess() throws Exception {
  //... start job and wait for completion
   jobOperator.getStepExecutions(executionId).stream()
     .map(BatchTestHelper::getProcessSkipCount)
     .forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
   assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}


8複数のステップを実行する

先に述べたように、仕事はいくつでもステップを踏むことができるので、今それを見ましょう。

8.1. 次のステップへ

デフォルトでは、

各ステップはジョブの最後のステップです

バッチジョブ内で次のステップを実行するには、ステップ定義内で

next

属性を使用して明示的に指定する必要があります。

<job id="simpleJobSequence">
    <step id="firstChunkStepStep1" next="firstBatchStepStep2">
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>
    </step>
    <step id="firstBatchStepStep2" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

この属性を忘れると、次のステップは実行されません。

そして、これがAPIでどのように見えるかを見ることができます。

@Test
public void givenTwoSteps__thenBatch__CompleteWithSuccess() throws Exception {
   //... start job and wait for completion
    assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.2. 流れ

一連のステップを

flow

にカプセル化することもできます。

フローが終了すると、実行要素

に遷移するのはフロー全体です。また、フロー内の要素はフロー外の要素には移行できません。

たとえば、フロー内で2つのステップを実行してから、そのフローを独立したステップに移行させることができます。

<job id="flowJobSequence">
    <flow id="flow1" next="firstBatchStepStep3">
        <step id="firstChunkStepStep1" next="firstBatchStepStep2">
            <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
        <processor ref="simpleChunkItemProcessor"/>
        <writer ref="simpleChunkWriter"/>
        </chunk>
    </step>
    <step id="firstBatchStepStep2">
        <batchlet ref="simpleBatchLet"/>
    </step>
    </flow>
    <step id="firstBatchStepStep3">
     <batchlet ref="simpleBatchLet"/>
    </step>
</job>

それでも、各ステップの実行は独立しています。

@Test
public void givenFlow__thenBatch__CompleteWithSuccess() throws Exception {
   //... start job and wait for completion

    assertEquals(3, jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

8.3. 決定事項

if/elseが

decisions

の形でサポートされています。

決定は、


ステップ、フロー、および分割

の間の順序を決定するためのカスタマイズされた方法を提供します。

ステップと同様に、ジョブの実行を指示または終了できる

next

などの遷移要素に対して機能します。

ジョブを設定する方法を見てみましょう。

<job id="decideJobSequence">
     <step id="firstBatchStepStep1" next="firstDecider">
     <batchlet ref="simpleBatchLet"/>
     </step>
     <decision id="firstDecider" ref="deciderJobSequence">
        <next on="two" to="firstBatchStepStep2"/>
        <next on="three" to="firstBatchStepStep3"/>
     </decision>
     <step id="firstBatchStepStep2">
    <batchlet ref="simpleBatchLet"/>
     </step>
     <step id="firstBatchStepStep3">
    <batchlet ref="simpleBatchLet"/>
     </step>
</job>


decision

要素はすべて、

Decider

を実装するクラスで構成する必要があります。その仕事は

String

として決定を返すことです。


decision

内のそれぞれの

next

は、


switch-

switchステートメントの中の


case

のようなものです。

8.4. 分割


Splits

はフローを同時に実行できるので便利です。

<job id="splitJobSequence">
   <split id="split1" next="splitJobSequenceStep3">
      <flow id="flow1">
      <step id="splitJobSequenceStep1">
              <batchlet ref="simpleBatchLet"/>
           </step>
      </flow>
      <flow id="flow2">
          <step id="splitJobSequenceStep2">
              <batchlet ref="simpleBatchLet"/>
      </step>
      </flow>
   </split>
   <step id="splitJobSequenceStep3">
      <batchlet ref="simpleBatchLet"/>
   </step>
</job>

もちろん、これは順序が保証されないことを意味します。

まだ実行されていることを確認しましょう。

フローステップは任意の順序で実行されますが、分離されたステップは常に最後になります。

@Test
public void givenSplit__thenBatch__CompletesWithSuccess() throws Exception {
   //... start job and wait for completion
    List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);

    assertEquals(3, stepExecutions.size());
    assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

** 9ジョブの分割

私たちの仕事で定義されている私たちのJavaコード内のバッチプロパティも消費することができます。

  • それらは、仕事、ステップ、そしてバッチアーティファクトの3つのレベルでスコープ指定できます。

それらがどのように消費したかの例をいくつか見てみましょう。

ジョブレベルでプロパティを消費したい場合:

@Inject
JobContext jobContext;
...
jobProperties = jobContext.getProperties();
...

これはステップレベルでも消費できます。

@Inject
StepContext stepContext;
...
stepProperties = stepContext.getProperties();
...

バッチ成果物レベルでプロパティーを消費したい場合は、次のようにします。

@Inject
@BatchProperty(name = "name")
private String nameString;

これはパーティションに便利です。

分割を使うと、フローを同時に実行できることがわかります。 ** しかし、アイテムを____n個のセットに分割したり、別々の入力を設定したりすることで、作業を複数のスレッドに分割することもできます。

各パーティションが行うべき作業のセグメントを理解するために、プロパティとパーティションを組み合わせることができます。

<job id="injectSimpleBatchLet">
    <properties>
        <property name="jobProp1" value="job-value1"/>
    </properties>
    <step id="firstStep">
        <properties>
            <property name="stepProp1" value="value1"/>
        </properties>
    <batchlet ref="injectSimpleBatchLet">
        <properties>
        <property name="name" value="#{partitionPlan['name']}"/>
        </properties>
    </batchlet>
    <partition>
        <plan partitions="2">
        <properties partition="0">
            <property name="name" value="firstPartition"/>
        </properties>
        <properties partition="1">
            <property name="name" value="secondPartition"/>
        </properties>
        </plan>
    </partition>
    </step>
</job>


10停止して再起動

今、それは仕事を定義するためのものです。それでは、それらの管理について少し説明しましょう。


__JobOperator


from

BatchRuntime__のインスタンスを取得できることは、私たちのユニットテストで既に見てきました。

JobOperator jobOperator = BatchRuntime.getJobOperator();

そして、私たちは仕事を始めることができます。

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

しかし、仕事をやめることもできます。

jobOperator.stop(executionId);

そして最後に、仕事を再開できます。

executionId = jobOperator.restart(executionId, new Properties());

実行中の仕事を停止する方法を見てみましょう。

@Test
public void givenBatchLetStarted__whenStopped__thenBatchStopped() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobOperator.stop(executionId);
    jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}

そして、バッチが

STOPPED

の場合、それを再開できます。

@Test
public void givenBatchLetStopped__whenRestarted__thenBatchCompletesSuccess() {
   //... start and stop the job

    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
    executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
    jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));

    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}


11求人を取得する

バッチジョブが投入されると、バッチランタイムはそれを追跡するために

JobExecution

のインスタンスを作成します。

実行IDの

JobExecution

を取得するには、

JobOperator#getJobExecution(executionId)

メソッドを使用できます。

また、


StepExecution

は、ステップの実行を追跡するのに役立つ情報を提供します

実行IDの

StepExecution

を取得するには、

JobOperator#getStepExecutions(executionId)

メソッドを使用できます。

それから、

StepExecution#getMetrics:

を介してステップに関するhttps://docs.oracle.com/javaee/7/api/javax/batch/runtime/Metric.MetricType.html[いくつかのメトリック]を取得できます。

@Test
public void givenChunk__whenJobStarts__thenStepsHaveMetrics() throws Exception {
   //... start job and wait for completion
    assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
    assertTrue(jobOperator.getParameters(executionId).isEmpty());
    StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0);
    Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
    assertEquals(10L, metricTest.get(Metric.MetricType.READ__COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.FILTER__COUNT).longValue());
    assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT__COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.WRITE__COUNT).longValue());
   //... and many more!
}


12. デメリット

JSR 352は強力ですが、いくつかの分野で欠けています。

  • 他の人を処理することができる読者と作家の不足があるようです

JSONなどの形式
** ジェネリック医薬品のサポートはありません

  • パーティショニングはシングルステップのみをサポートします

  • APIはスケジューリングをサポートするものを提供していません(J2EEにはありますが)。

別のスケジューリングモジュール)
** その非同期的な性質のために、テストは難題かもしれません

  • APIはかなり冗長です


13. 結論

この記事では、JSR 352を見て、チャンク、バッチレット、分割、フローなどについて学びました。それでも、表面にはほとんど傷がありません。

いつものようにデモコードはhttps://github.com/eugenp/tutorials/tree/master/jee-7[over GitHub]にあります。