1. 概要

デフォルトでは、Springバッチジョブは、実行中に発生したエラーに対して失敗します。 ただし、断続的な障害に対処するために、アプリケーションの復元力を向上させたい場合があります。

このクイックチュートリアルでは、Springバッチフレームワークで再試行ロジックを構成する方法について説明します。

2. ユースケースの例

入力CSVファイルを読み取るバッチジョブがあるとします。

username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

次に、RESTエンドポイントをヒットして、ユーザーのageおよびpostCode属性をフェッチすることにより、各レコードを処理します。

public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
    
    @Override
    public Transaction process(Transaction transaction) throws IOException {
        log.info("RetryItemProcessor, attempting to process: {}", transaction);
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        //parse user's age and postCode from response and update transaction
        ...
        return transaction;
    }
    ...
}

そして最後に、統合された出力XMLを生成します。

<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    ...
</transactionRecord>

3. ItemProcessorへの再試行の追加

では、ネットワークの速度が遅いために、RESTエンドポイントへの接続がタイムアウトした場合はどうなるでしょうか。 その場合、バッチジョブは失敗します。

このような場合、失敗したアイテムの処理を数回再試行することをお勧めします。 それで、失敗した場合に最大3回の再試行を実行するようにバッチジョブを構成しましょう

@Bean
public Step retryStep(
  ItemProcessor<Transaction, Transaction> processor,
  ItemWriter<Transaction> writer) throws ParseException {
    return stepBuilderFactory
      .get("retryStep")
      .<Transaction, Transaction>chunk(10)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .faultTolerant()
      .retryLimit(3)
      .retry(ConnectTimeoutException.class)
      .retry(DeadlockLoserDataAccessException.class)
      .build();
}

ここでは、再試行機能を有効にするために faultTolerant()を呼び出しています。 さらに、はretryとretryLimitを使用して、アイテムの再試行と最大再試行回数の対象となる例外をそれぞれ定義します。

4. 再試行のテスト

agepostCodeを返すRESTエンドポイントがしばらくの間ダウンしたテストシナリオを考えてみましょう。 このテストシナリオでは、最初の2つのAPI呼び出しに対してのみ ConnectTimeoutException が発生し、3番目の呼び出しは成功します。

@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
    FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
    FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

    when(httpResponse.getEntity())
      .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
 
    //fails for first two calls and passes third time onwards
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Timeout count 1"))
      .thenThrow(new ConnectTimeoutException("Timeout count 2"))
      .thenReturn(httpResponse);

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
    AssertFile.assertFileEquals(expectedResult, actualResult);
}

ここで、私たちの仕事は無事に完了しました。 さらに、ログから、 id = 1234の最初のレコードが2回失敗し、最後に3回目の再試行で成功したことが明らかです。

19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

同様に、別のテストケースを用意して、すべての再試行が終了したときに何が起こるかを確認しましょう

@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Endpoint is down"));

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
    assertThat(actualJobExitStatus.getExitDescription(),
      containsString("org.apache.http.conn.ConnectTimeoutException"));
}

この場合、ConnectTimeoutException が原因でジョブが最終的に失敗する前に、最初のレコードに対して3回の再試行が試行されました。

5. XMLを使用した再試行の構成

最後に、上記の構成に相当するXMLを見てみましょう。

<batch:job id="retryBatchJob">
    <batch:step id="retryStep">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="retryItemProcessor" commit-interval="10"
              retry-limit="3">
                <batch:retryable-exception-classes>
                    <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
                    <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                </batch:retryable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

6. 結論

この記事では、SpringBatchで再試行ロジックを構成する方法を学びました。 JavaとXMLの両方の構成を調べました。

また、単体テストを使用して、再試行が実際にどのように機能するかを確認しました。

いつものように、このチュートリアルのサンプルコードは、GitHubでから入手できます。