春のバッチ – タスクレット対チャンク
1前書き
Spring Batch
は、ジョブを実装するための2つの異なる方法を提供しています:タスクレットとチャンクの使用** 。
この記事では、実際の簡単な例を使用して両方の方法を構成および実装する方法を学習します。
2依存関係
まず必要な依存関係を追加することから始めましょう。
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>4.0.0.RELEASE</version>
<scope>test</scope>
</dependency>
spring-batch-core
とhttps://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.springframework.batch%22%20AND%20a%3A%22spring-batch-test %22[spring-batch-test]は、Maven Centralを参照してください。
3私たちのユースケース
次の内容のCSVファイルを考えてみましょう。
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
各行の
最初の位置は人の名前を表し、2番目の位置は彼/彼女の生年月日
を表します。
私たちのユースケースは、各人の名前と年齢を含む別のCSVファイルを生成することです。
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25
ドメインが明確になったので、次に進み、両方のアプローチを使ってソリューションを構築しましょう。タスクレットから始めましょう。
4タスクレットアプローチ
4.1. 紹介とデザイン
タスクレットは、ステップ内で単一のタスクを実行するためのものです。私たちの仕事は次々に実行されるいくつかのステップから成ります。 ** 各ステップは、定義済みのタスクを1つだけ実行します。
私たちの仕事は3つのステップからなります。
-
入力CSVファイルから行を読み取ります.
-
入力CSVファイル内のすべての人の年齢を計算します.
-
各自の名前と年齢を新しい出力CSVファイルに書き込みます.
全体像の準備ができたので、ステップごとに1つのクラスを作成しましょう。
LinesReader
は、入力ファイルからのデータの読み取りを担当します。
public class LinesReader implements Tasklet {
//...
}
LinesProcessor
は、ファイル内のすべての人の年齢を計算します。
public class LinesProcessor implements Tasklet {
//...
}
最後に、
LinesWriter
が名前と年齢を出力ファイルに書き込む責任を持ちます。
public class LinesWriter implements Tasklet {
//...
}
この時点で、私たちのすべてのステップは
Tasklet
インターフェースを実装しています** 。それは私たちにその
execute
メソッドを実装することを強制します。
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
//...
}
この方法で、各ステップのロジックを追加します。そのコードから始める前に、私たちの仕事を設定しましょう。
4.2. 構成
Springのアプリケーションコンテキストに設定を追加する必要があります。
前のセクションで作成したクラスに標準のBean宣言を追加した後、ジョブ定義を作成する準備が整いました。
@Configuration
@EnableBatchProcessing
public class TaskletsConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
protected Step readLines() {
return steps
.get("readLines")
.tasklet(linesReader())
.build();
}
@Bean
protected Step processLines() {
return steps
.get("processLines")
.tasklet(linesProcessor())
.build();
}
@Bean
protected Step writeLines() {
return steps
.get("writeLines")
.tasklet(linesWriter())
.build();
}
@Bean
public Job job() {
return jobs
.get("taskletsJob")
.start(readLines())
.next(processLines())
.next(writeLines())
.build();
}
//...
}
これは、私たちの「taskletsJob」が3つのステップからなることを意味します。最初のもの(
readLines
)は、Bean
linesReader
に定義されているタスクレットを実行し、次のステップに進みます:
processLines。 ProcessLines
は、Bean
linesProcessor
に定義されているタスクレットを実行し、最後のステップである
writeLines
に進みます。
私たちの仕事の流れは定義されました、そして我々はいくつかの論理を追加する準備ができています!
** 4.3. モデルと使用法
CSVファイルの行を操作するので、クラス__Lineを作成します。
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
//standard constructor, getters, setters and toString implementation
}
Line
は
Serializableを実装しています。これは、
Line__がステップ間でデータを転送するためのDTOとして機能するためです。 Spring Batchによれば、ステップ間で転送される
オブジェクトはシリアライズ可能でなければなりません
。
一方、行の読み書きについて考え始めることができます。
そのために、OpenCSVを利用します。
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>4.1</version>
</dependency>
Maven Centralで最新のhttps://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22com.opencsv%22%20AND%20a%3A%22opencsv%22[OpenCSV]バージョンを探します。
OpenCSVが含まれたら、** FileUtils__クラスも作成します。 CSV行を読み書きするためのメソッドを提供します。
public class FileUtils {
public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[]line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}
public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[]lineStr = new String[2];
lineStr[0]= line.getName();
lineStr[1]= line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}
//...
}
readLine
は、OpenCSVの
readNext
メソッドのラッパーとして機能し、
Line
オブジェクトを返します。
同様に、
writeLine
はOpenCSVの
writeNext
が
Line
オブジェクトを受け取るのをラップします。このクラスの完全な実装はhttps://github.com/eugenp/tutorials/blob/master/spring-batch/src/main/java/org/baeldung/taskletsvschunks/utils/FileUtils.java[GitHubプロジェクト]にあります]。
この時点で、私たちはすべて各ステップの実装から始める準備が整いました。
4.4.
LinesReader
さあ、
LinesReader
クラスを完成させましょう。
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}
LinesReaderのexecute
メソッドは、入力ファイルパス上に
FileUtils
インスタンスを作成します。次に、
これ以上読み込む行がなくなるまで
をリストに追加します。
私たちのクラスは、
BeforeStep
と
afterStep
の2つの追加メソッドを提供する
StepExecutionListener
** も実装しています。
execute
の実行前後に、これらのメソッドを使用して初期化して閉じます。
afterStep
コードを見ると、結果リスト(
lines)
が次のステップで利用できるようにするためにジョブのコンテキスト内に配置されている行に注意してください。
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
この時点で、最初のステップはすでにその責任を果たしています。
CSV行をメモリ内の
List
にロードします。 2番目のステップに進み、それらを処理しましょう。
4.5.
LinesProcessor
-
LinesProcessor
は、
StepExecutionListener
、そしてもちろん
Tasklet
も実装するでしょう。
public class LinesProcessor implements Tasklet, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);
private List<Line> lines;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}
仕事の前後関係から
lines
リストを読み込み、各人の年齢を計算すること** を理解するのは簡単です。
前の手順と同じオブジェクトに対して変更が行われるため、別の結果リストをコンテキストに含める必要はありません。
そして、私たちは最後のステップの準備が整いました。
4.6.
LinesWriter
-
LinesWriter
のタスクは
lines
リストを調べて名前と年齢を出力ファイルに書き込むことです** :
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}
仕事の実装は完了しました。実行するテストを作成して結果を確認しましょう。
4.7. ジョブを実行する
仕事を実行するには、テストを作成します。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob__whenJobEnds__thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
ContextConfiguration
アノテーションはSpringのコンテキスト設定クラスを指しています。これは私たちのジョブ定義を持っています。
テストを実行する前に、いくつか追加のBeanを追加する必要があります。
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
MapJobRepositoryFactoryBean factory
= new MapJobRepositoryFactoryBean();
factory.setTransactionManager(transactionManager());
return (JobRepository) factory.getObject();
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
すべて準備が整いました。さあ、テストを始めましょう。
ジョブが終了した後、
output.csv
は期待された内容を持ち、ログは実行フローを示します。
----[main]DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.[main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Mae Hodges,10/22/1972][main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Gary Potter,02/22/1953][main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Betty Wise,02/17/1968][main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Wayne Rose,04/06/1977][main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Adam Caldwell,09/27/1995][main]DEBUG o.b.t.tasklets.LinesReader - Read line:[Lucille Phillips,05/14/1992][main]DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.[main]DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.[main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line[Mae Hodges,10/22/1972][main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line[Gary Potter,02/22/1953][main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line[Betty Wise,02/17/1968][main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line[Wayne Rose,04/06/1977][main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line[Adam Caldwell,09/27/1995][main]DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line[Lucille Phillips,05/14/1992][main]DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.[main]DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.[main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Mae Hodges,10/22/1972,45][main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Gary Potter,02/22/1953,64][main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Betty Wise,02/17/1968,49][main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Wayne Rose,04/06/1977,40][main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Adam Caldwell,09/27/1995,22][main]DEBUG o.b.t.tasklets.LinesWriter - Wrote line[Lucille Phillips,05/14/1992,25][main]DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.
----
タスクレットについてはこれで終わりです。これで、チャンクアプローチに進むことができます。
5チャンクアプローチ
5.1. 紹介とデザイン
その名前が示すように、このアプローチはデータの塊にアクションを実行します。つまり、一度にすべての行を読み取り、処理、書き込みするのではなく、一度に一定量のレコード(チャンク)を読み取り、処理し、書き込みます。
その後、ファイルにデータがなくなるまでこのサイクルを繰り返します。
その結果、フローはわずかに異なります。
-
行がありますが:
-
X行分
-
** 一行読む
-
** 1行処理
-
X行書いてください。
-
そのため、チャンク指向のアプローチのために** 3つのBeanを作成する必要があります。
public class LineReader {
//...
}
public class LineProcessor {
//...
}
public class LinesWriter {
//...
}
実装に移る前に、私たちの仕事を設定しましょう。
5.2. 構成
仕事の定義も異なって見えるでしょう:
@Configuration
@EnableBatchProcessing
public class ChunksConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return steps.get("processLines").<Line, Line> chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job() {
return jobs
.get("chunksJob")
.start(processLines(itemReader(), itemProcessor(), itemWriter()))
.build();
}
}
この場合、タスクレットを1つだけ実行するステップは1つだけです。
ただし、そのタスクレットは、大量のデータを処理するリーダー、ライター、およびプロセッサを定義します。
-
コミット間隔は、1チャンク** で処理されるデータ量を示します。私たちの仕事は一度に2行を読み、処理し、書くでしょう。
これで、チャンクロジックを追加する準備が整いました。
5.3.
LineReader
LineReader
は、1つのレコードを読み取り、その内容を含む
Line
インスタンスを返すことを担当します。
読者になるためには、
私たちのクラスは
ItemReader
インターフェースを実装する必要があります
:
public class LineReader implements ItemReader<Line> {
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null)
logger.debug("Read line: " + line.toString());
return line;
}
}
コードは簡単です、それはただ1行を読んでそれを返します。
このクラスの最終バージョンにも
StepExecutionListener
を実装します。
public class LineReader implements
ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}
beforeStep
と
afterStep
はそれぞれステップ全体の前後に実行されることに注意してください。
5.4.
LineProcessor
LineProcessor
は、
LineReader
とほぼ同じロジックに従います。
ただし、この場合は** ItemProcessorとそのメソッド__process()を実装します。
public class LineProcessor implements ItemProcessor<Line, Line> {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
}
-
process()
メソッドは入力行を受け取り、それを処理して出力行を返します** 。繰り返しますが、__StepExecutionListenerも実装します。
public class LineProcessor implements
ItemProcessor<Line, Line>, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}
5.5.
LinesWriter
リーダーやプロセッサとは異なり、
LinesWriter
は
Linesの
List__を受け取るように、行全体のチャンク
を書き込みます。
public class LinesWriter implements
ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}
LinesWriter
コードはそれ自体を代弁します。またしても、仕事をテストする準備が整いました。
5.6. ジョブを実行する
タスクレットアプローチ用に作成したものと同じ新しいテストを作成します。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob__whenJobEnds__thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
TaskletsConfig
について上記で説明したように
ChunksConfig
を設定した後、テストを実行する準備が整いました。
仕事が終わったら、
output.csv
には期待される結果が再度含まれていることがわかります。ログにはフローが記述されています。
----[main]DEBUG o.b.t.chunks.LineReader - Line Reader initialized.[main]DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.[main]DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.[main]DEBUG o.b.t.chunks.LineReader - Read line:[Mae Hodges,10/22/1972][main]DEBUG o.b.t.chunks.LineReader - Read line:[Gary Potter,02/22/1953][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line[Mae Hodges,10/22/1972][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line[Gary Potter,02/22/1953][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Mae Hodges,10/22/1972,45][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Gary Potter,02/22/1953,64][main]DEBUG o.b.t.chunks.LineReader - Read line:[Betty Wise,02/17/1968][main]DEBUG o.b.t.chunks.LineReader - Read line:[Wayne Rose,04/06/1977][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line[Betty Wise,02/17/1968][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line[Wayne Rose,04/06/1977][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Betty Wise,02/17/1968,49][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Wayne Rose,04/06/1977,40][main]DEBUG o.b.t.chunks.LineReader - Read line:[Adam Caldwell,09/27/1995][main]DEBUG o.b.t.chunks.LineReader - Read line:[Lucille Phillips,05/14/1992][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line[Adam Caldwell,09/27/1995][main]DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line[Lucille Phillips,05/14/1992][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Adam Caldwell,09/27/1995,22][main]DEBUG o.b.t.chunks.LinesWriter - Wrote line[Lucille Phillips,05/14/1992,25][main]DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.[main]DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.[main]DEBUG o.b.t.chunks.LineReader - Line Reader ended.
----
-
結果は同じでフローは異なります。ログは、このアプローチに従ってジョブがどのように実行されるかを明らかにします。
6. 結論
状況が異なると、どちらかの方法が必要であることがわかります。
-
タスクレットが他のシナリオの後のタスクに対してより自然に感じる一方で、チャンクは、ページ付けされた読み取りや、大量のデータをメモリに保存したくない状況に対処するための簡単な解決策を提供します。
この例の完全な実装はhttps://github.com/eugenp/tutorials/tree/master/spring-batch[
GitHubプロジェクト
]にあります。