SpringBatch–タスクレットとチャンク
1. 序章
Spring Batchは、ジョブを実装するための2つの異なる方法を提供します。タスクレットとチャンクを使用する。
この記事では、簡単な実際の例を使用して、両方のメソッドを構成および実装する方法を学習します。
2. 依存関係
必要な依存関係を追加することから始めましょう:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>4.3.0</version>
<scope>test</scope>
</dependency>
spring-batch-coreおよびspring-batch-testの最新バージョンを入手するには、MavenCentralを参照してください。
3. 私たちのユースケース
次の内容のCSVファイルについて考えてみましょう。
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
各行の最初の位置は人の名前を表し、2番目の位置はその人の生年月日を表します。
私たちのユースケースは、各人の名前と年齢を含む別のCSVファイルを生成することです:
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25
ドメインが明確になったので、先に進み、両方のアプローチを使用してソリューションを構築しましょう。 タスクレットから始めましょう。
4. タスクレットアプローチ
4.1. はじめにとデザイン
タスクレットは、ステップ内で単一のタスクを実行することを目的としています。 私たちの仕事は、次々に実行されるいくつかのステップで構成されます。 各ステップは、定義された1つのタスクのみを実行する必要があります。
私たちの仕事は3つのステップで構成されます。
- 入力CSVファイルから行を読み取ります。
- 入力CSVファイルですべての人の年齢を計算します。
- 各人の名前と年齢を新しい出力CSVファイルに書き込みます。
全体像の準備ができたので、ステップごとに1つのクラスを作成しましょう。
LinesReader は、入力ファイルからのデータの読み取りを担当します。
public class LinesReader implements Tasklet {
// ...
}
LinesProcessor は、ファイル内のすべての人の年齢を計算します。
public class LinesProcessor implements Tasklet {
// ...
}
最後に、 LinesWriter は、出力ファイルに名前と年齢を書き込む責任があります。
public class LinesWriter implements Tasklet {
// ...
}
この時点で、すべてのステップでタスクレットインターフェイスを実装します。 これにより、executeメソッドを実装する必要があります。
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
// ...
}
このメソッドは、各ステップのロジックを追加する場所です。 そのコードを始める前に、ジョブを構成しましょう。
4.2. 構成
Springのアプリケーションコンテキストにいくつかの構成を追加する必要があります。 前のセクションで作成したクラスに標準のbean宣言を追加したら、ジョブ定義を作成する準備が整います。
@Configuration
@EnableBatchProcessing
public class TaskletsConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
protected Step readLines() {
return steps
.get("readLines")
.tasklet(linesReader())
.build();
}
@Bean
protected Step processLines() {
return steps
.get("processLines")
.tasklet(linesProcessor())
.build();
}
@Bean
protected Step writeLines() {
return steps
.get("writeLines")
.tasklet(linesWriter())
.build();
}
@Bean
public Job job() {
return jobs
.get("taskletsJob")
.start(readLines())
.next(processLines())
.next(writeLines())
.build();
}
// ...
}
これは、「taskletsJob」が3つのステップで構成されることを意味します。 最初の1つ ( readLines )Beanで定義されたタスクレットを実行します linesReader 次のステップに進みます。
ジョブフローが定義され、ロジックを追加する準備が整いました。
4.3. モデルとユーティリティ
CSVファイルの行を操作するので、クラス Line:を作成します。
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
// standard constructor, getters, setters and toString implementation
}
LineはSerializableを実装していることに注意してください。これは、Lineがステップ間でデータを転送するDTOとして機能するためです。 Spring Batchによると、ステップ間で転送されるオブジェクトはシリアル化可能である必要があります。
一方で、行の読み書きについて考えることもできます。
そのために、OpenCSVを利用します。
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>4.1</version>
</dependency>
MavenCentralで最新のOpenCSVバージョンを探します。
OpenCSVが含まれると、FileUtilsクラスも作成します。 CSV行を読み書きするためのメソッドを提供します。
public class FileUtils {
public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[] line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}
public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}
// ...
}
readLine は、OpenCSVの readNext メソッドのラッパーとして機能し、Lineオブジェクトを返すことに注意してください。
同様に、 writeLine は、Lineオブジェクトを受信するOpenCSVのwriteNextをラップします。 このクラスの完全な実装は、 GitHubProjectにあります。
この時点で、各ステップの実装を開始する準備が整いました。
4.4. LinesReader
先に進んで、LinesReaderクラスを完了しましょう。
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}
LinesReaderのexecuteメソッドは、入力ファイルパス上にFileUtilsインスタンスを作成します。 次に、は、読み取る行がなくなるまでリストに行を追加します。
クラスは、StepExecutionListener も実装しており、beforeStepとafterStepの2つの追加メソッドを提供します。 これらのメソッドを使用して、実行の実行の前後に物事を初期化して閉じます。
afterStep codeを見ると、結果リスト( lines)がジョブのコンテキストに配置され、次のステップで使用できるようになっていることがわかります。
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
この時点で、最初のステップはすでにその責任を果たしています。CSV行をメモリ内のリストにロードします。 2番目のステップに移動してそれらを処理しましょう。
4.5. LinesProcessor
LinesProcessorは、StepExecutionListener、そしてもちろんTaskletも実装します。つまり、 beforeStep 、 execute 、afterStepメソッドも実装します。
public class LinesProcessor implements Tasklet, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);
private List<Line> lines;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}
ジョブのコンテキストから行リストをロードし、各人の年齢を計算することを理解するのは簡単です。
前の手順と同じオブジェクトで変更が行われるため、コンテキストに別の結果リストを配置する必要はありません。
そして、最後のステップの準備が整いました。
4.6. LinesWriter
LinesWriterのタスクは、行リストを調べて、名前と経過時間を出力ファイルに書き込むことです:
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}
これで、ジョブの実装は完了です。 テストを作成して実行し、結果を確認しましょう。
4.7. ジョブの実行
ジョブを実行するために、テストを作成します。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
ContextConfiguration アノテーションは、ジョブ定義を持つSpringコンテキスト構成クラスを指しています。
テストを実行する前に、いくつかのBeanを追加する必要があります。
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(transactionManager());
return factory.getObject();
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
すべての準備が整いました! さあ、テストを実行してください!
ジョブが終了すると、 output.csv に期待されるコンテンツが含まれ、ログに実行フローが表示されます。
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.
タスクレットは以上です。 これで、チャンクアプローチに進むことができます。
5。 チャンクアプローチ
5.1. はじめにとデザイン
名前が示すように、このアプローチはデータのチャンクに対してアクションを実行します。 つまり、すべての行を一度に読み取り、処理、および書き込むのではなく、一度に一定量のレコード(チャンク)を読み取り、処理し、書き込みます。
次に、ファイルにデータがなくなるまでこのサイクルを繰り返します。
その結果、フローはわずかに異なります。
- 行がある間:
- X行の量に対して行います:
- 1行読む
- 1行を処理します
- X行分を書き込みます。
- X行の量に対して行います:
したがって、チャンク指向アプローチ用に3つのBeanも作成する必要があります。
public class LineReader {
// ...
}
public class LineProcessor {
// ...
}
public class LinesWriter {
// ...
}
実装に移る前に、ジョブを構成しましょう。
5.2. 構成
ジョブ定義も異なって見えます:
@Configuration
@EnableBatchProcessing
public class ChunksConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return steps.get("processLines").<Line, Line> chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job() {
return jobs
.get("chunksJob")
.start(processLines(itemReader(), itemProcessor(), itemWriter()))
.build();
}
}
この場合、1つのタスクレットのみを実行するステップは1つだけです。
ただし、そのタスクレットは、データのチャンクを処理するリーダー、ライター、およびプロセッサーを定義します。
commit間隔は、1つのチャンクで処理されるデータの量を示していることに注意してください。 私たちの仕事は、一度に2行を読み取り、処理し、書き込みます。
これで、チャンクロジックを追加する準備が整いました。
5.3. LineReader
LineReader は、1つのレコードを読み取り、その内容とともにLineインスタンスを返す役割を果たします。
リーダーになるには、クラスはItemReaderインターフェイスを実装する必要があります。
public class LineReader implements ItemReader<Line> {
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null)
logger.debug("Read line: " + line.toString());
return line;
}
}
コードは単純で、1行を読み取って返すだけです。 このクラスの最終バージョンには、StepExecutionListenerも実装します。
public class LineReader implements
ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}
beforeStepとafterStepは、それぞれステップ全体の前と後に実行されることに注意してください。
5.4. LineProcessor
LineProcessor は、LineReaderとほぼ同じロジックに従います。
ただし、この場合、 ItemProcessorとそのメソッドprocess()を実装します。
public class LineProcessor implements ItemProcessor<Line, Line> {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
}
process()メソッドは入力行を受け取り、それを処理して出力行を返します。 ここでも、 StepExecutionListener:も実装します。
public class LineProcessor implements
ItemProcessor<Line, Line>, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}
5.5. LinesWriter
リーダーやプロセッサとは異なり、 LinesWriterは、 Lines:の List を受け取るように、linesのチャンク全体を書き込みます。
public class LinesWriter implements
ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}
LinesWriterコードはそれ自体を話します。 そして再び、私たちは自分の仕事をテストする準備ができています。
5.6. ジョブの実行
タスクレットアプローチ用に作成したものと同じ新しいテストを作成します。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
上記でTaskletsConfigについて説明したように、 ChunksConfig を構成した後、テストを実行する準備が整いました。
ジョブが完了すると、 output.csv に再び期待される結果が含まれ、ログにフローが記述されていることがわかります。
[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.
同じ結果と異なるフローがあります。 ログは、このアプローチに従ってジョブがどのように実行されるかを明らかにします。
6. 結論
さまざまなコンテキストで、いずれかのアプローチの必要性が示されます。 タスクレットは「次々とタスク」のシナリオでより自然に感じられますが、チャンクは、ページ分割された読み取りや、大量のデータをメモリに保持したくない状況に対処するためのシンプルなソリューションを提供します。
この例の完全な実装は、GitHubプロジェクトにあります。