1. 概要

以前のSpringBatch の紹介では、バッチ処理ツールとしてフレームワークを紹介しました。 また、シングルスレッドのシングルプロセスジョブ実行の構成の詳細と実装についても説明しました。

いくつかの並列処理でジョブを実装するために、さまざまなオプションが提供されています。 より高いレベルでは、並列処理には2つのモードがあります。

  1. シングルプロセス、マルチスレッド
  2. マルチプロセス

このクイック記事では、 Step のパーティショニングについて説明します。これは、シングルプロセスジョブとマルチプロセスジョブの両方に実装できます。

2. ステップの分割

パーティショニングを備えたSpringBatchは、Stepの実行を分割する機能を提供します。

上の図は、パーティション化されたStepを使用したJobの実装を示しています。

「マスター」と呼ばれるステップがあり、その実行はいくつかの「スレーブ」ステップに分割されています。 これらのスレーブはマスターの代わりになることができ、結果は変更されません。 マスターとスレーブの両方がStepのインスタンスです。 スレーブは、リモートサービスにすることも、ローカルで実行するスレッドにすることもできます。

必要に応じて、マスターからスレーブにデータを渡すことができます。 メタデータ(つまり JobRepository )は、Jobの1回の実行ですべてのスレーブが1回だけ実行されるようにします。

すべてがどのように機能するかを示すシーケンス図を次に示します。

示されているように、PartitionStepが実行を駆動しています。 PartitionHandler は、「マスター」の作業を「スレーブ」に分割する役割を果たします。 右端のステップはスレーブです。

3. Maven POM

Mavenの依存関係は、以前の記事で説明したものと同じです。 つまり、Spring Core、Spring Batch、およびデータベースの依存関係(この場合、 SQLite )です。

4. 構成

紹介の記事では、いくつかの財務データをCSVからXMLファイルに変換する例を見ました。 同じ例を拡張してみましょう。

ここでは、マルチスレッド実装を使用して、財務情報を5つのCSVファイルから対応するXMLファイルに変換します。

これは、単一のJobおよびStepパーティショニングを使用して実現できます。 CSVファイルごとに1つずつ、合計5つのスレッドがあります。

まず、ジョブを作成しましょう。

@Bean(name = "partitionerJob")
public Job partitionerJob() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob")
      .start(partitionStep())
      .build();
}

ご覧のとおり、このJobPartitioningStepで始まります。 これは、さまざまなスレーブステップに分割されるマスターステップです。

@Bean
public Step partitionStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("partitionStep")
      .partitioner("slaveStep", partitioner())
      .step(slaveStep())
      .taskExecutor(taskExecutor())
      .build();
}

ここでは、StepBuilderFactoryを使用してPartitioningStepを作成します。 そのためには、SlaveStepsPartitionerに関する情報を提供する必要があります。

パーティショナーは、各スレーブの入力値のセットを定義する機能を提供するインターフェースです。 つまり、タスクをそれぞれのスレッドに分割するロジックがここにあります。

CustomMultiResourcePartitioner という実装を作成してみましょう。ここで、入力ファイル名と出力ファイル名を ExecutionContext に入れて、すべてのスレーブステップに渡します。

public class CustomMultiResourcePartitioner implements Partitioner {
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString(keyName, resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put(PARTITION_KEY + i, context);
            i++;
        }
        return map;
    }
}

このクラスのBeanも作成します。ここで、入力ファイルのソースディレクトリを指定します。

@Bean
public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
          .getResources("file:src/main/resources/input/*.csv");
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    }
    partitioner.setResources(resources);
    return partitioner;
}

リーダーとライターを使用する他のステップと同様に、スレーブステップを定義します。 リーダーとライターは、紹介の例で見たものと同じですが、ファイル名パラメーターを StepExecutionContext。

これらのBeanは、すべてのステップで stepExecutionContext パラメーターを受信できるように、ステップスコープを設定する必要があることに注意してください。 ステップスコープが設定されていない場合、Beanは最初に作成され、ステップレベルでファイル名を受け入れません。

@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
 
    FlatFileItemReader<Transaction> reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper<Transaction> lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter<Transaction> itemWriter 
      = new StaxEventItemWriter<Transaction>();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName("transactionRecord");
    itemWriter.setResource(new ClassPathResource("xml/" + filename));
    return itemWriter;
}

スレーブステップでリーダーとライターについて言及している間、これらのファイル名は stepExecutionContext からファイル名を受け取るため、使用されないため、引数をnullとして渡すことができます。

@Bean
public Step slaveStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
      .reader(itemReader(null))
      .writer(itemWriter(marshaller(), null))
      .build();
}

5. 結論

このチュートリアルでは、SpringBatchを使用して並列処理でジョブを実装する方法について説明しました。

いつものように、この例の完全な実装は、GitHubから入手できます。