1前書き

この記事では、Spring Batchの実用的なコード中心のイントロに焦点を当てます。 Spring Batchは、堅牢なジョブ実行のために設計された処理フレームワークです。

現在のバージョン3.0で、Spring 4とJava 8をサポートしています。また、バッチ処理用の新しいJava仕様であるJSR-352にも対応しています。


ここには

このフレームワークの興味深く実用的なユースケースがいくつかあります。

** 2ワークフローの基本

**


image、width = 738、height = 294

Springバッチは、ジョブリポジトリがジョブのスケジューリングと対話の仕事をするという伝統的なバッチアーキテクチャに従います。

ジョブは複数のステップを持つことができます – そして、各ステップは通常データの読み取り、処理、書き込みの順序に従います。

そしてもちろん、フレームワークは、特にジョブを扱う低レベルの永続化作業に関しては、ここで私たちのために大部分の重い作業を行います – ジョブリポジトリに

sqlite

を使用します。


2.1. 私たちの例のユースケース

ここで取り組む簡単なユースケースは、金融取引データをCSVからXMLに移行することです。

入力ファイルは非常に単純な構造を持っています – それは1行ごとのトランザクションを含み、それは以下から構成されます:ユーザー名、ユーザーID、トランザクションの日付と金額

username, userid, transaction__date, transaction__amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411

** 3 Mavenポンポン

**

このプロジェクトに必要な依存関係は、Spring Core、Spring Batch、および

sqlite

jdbc connectorです。

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.baeldung</groupId>
    <artifactId>spring-batch-intro</artifactId>
    <version>0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-batch-intro</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>4.2.0.RELEASE</spring.version>
        <spring.batch.version>3.0.5.RELEASE</spring.batch.version>
        <sqlite.version>3.8.11.2</sqlite.version>
    </properties>

    <dependencies>
        <!-- SQLite database driver -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>${sqlite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>
    </dependencies>
</project>


4 Spring Batch Config

最初にすることは、Spring BatchをXMLで構成することです。

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/jdbc
    http://www.springframework.org/schema/jdbc/spring-jdbc-4.2.xsd
">

    <!-- connect to SQLite database -->
    <bean id="dataSource"
      class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="org.sqlite.JDBC"/>
        <property name="url" value="jdbc:sqlite:repository.sqlite"/>
        <property name="username" value=""/>
        <property name="password" value=""/>
    </bean>

    <!-- create job-meta tables automatically -->
    <jdbc:initialize-database data-source="dataSource">
        <jdbc:script
          location="org/springframework/batch/core/schema-drop-sqlite.sql"/>
        <jdbc:script location="org/springframework/batch/core/schema-sqlite.sql"/>
    </jdbc:initialize-database>

    <!-- stored job-meta in memory -->
    <!--
    <bean id="jobRepository"
      class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    -->

    <!-- stored job-meta in database -->
    <bean id="jobRepository"
      class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="transactionManager" ref="transactionManager"/>
        <property name="databaseType" value="sqlite"/>
    </bean>

    <bean id="transactionManager" class=
      "org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

</beans>

もちろん、Java設定も利用可能です。

@Configuration
@EnableBatchProcessing
public class SpringConfig {

    @Value("org/springframework/batch/core/schema-drop-sqlite.sql")
    private Resource dropReopsitoryTables;

    @Value("org/springframework/batch/core/schema-sqlite.sql")
    private Resource dataReopsitorySchema;

    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("org.sqlite.JDBC");
        dataSource.setUrl("jdbc:sqlite:repository.sqlite");
        return dataSource;
    }

    @Bean
    public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
      throws MalformedURLException {
        ResourceDatabasePopulator databasePopulator =
          new ResourceDatabasePopulator();

        databasePopulator.addScript(dropReopsitoryTables);
        databasePopulator.addScript(dataReopsitorySchema);
        databasePopulator.setIgnoreFailedDrops(true);

        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource);
        initializer.setDatabasePopulator(databasePopulator);

        return initializer;
    }

    private JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource());
        factory.setTransactionManager(getTransactionManager());
        factory.afterPropertiesSet();
        return (JobRepository) factory.getObject();
    }

    private PlatformTransactionManager getTransactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobLauncher getJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}


5 Spring Batch Job Config

それでは、CSVからXMLへの作業の仕事の説明を書きましょう。

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
">

    <import resource="spring.xml"/>

    <bean id="record" class="org.baeldung.spring__batch__intro.model.Transaction"></bean>
    <bean id="itemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">

        <property name="resource" value="input/record.csv"/>

        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class=
                      "org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="username,userid,transactiondate,amount"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.baeldung.spring__batch__intro.service.RecordFieldSetMapper"/>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="itemProcessor"
      class="org.baeldung.spring__batch__intro.service.CustomItemProcessor"/>

    <bean id="itemWriter"
      class="org.springframework.batch.item.xml.StaxEventItemWriter">
        <property name="resource" value="file:xml/output.xml"/>
        <property name="marshaller" ref="recordMarshaller"/>
        <property name="rootTagName" value="transactionRecord"/>
    </bean>

    <bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
        <property name="classesToBeBound">
            <list>
                <value>org.baeldung.spring__batch__intro.model.Transaction</value>
            </list>
        </property>
    </bean>
    <batch:job id="firstBatchJob">
        <batch:step id="step1">
            <batch:tasklet>
                <batch:chunk reader="itemReader" writer="itemWriter"
                  processor="itemProcessor" commit-interval="10">
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>
</beans>

そしてもちろん、同様のJavaベースのジョブ設定:

public class SpringBatchConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Value("input/record.csv")
    private Resource inputCsv;

    @Value("file:xml/output.xml")
    private Resource outputXml;

    @Bean
    public ItemReader<Transaction> itemReader()
      throws UnexpectedInputException, ParseException {
        FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        String[]tokens = { "username", "userid", "transactiondate", "amount" };
        tokenizer.setNames(tokens);
        reader.setResource(inputCsv);
        DefaultLineMapper<Transaction> lineMapper =
          new DefaultLineMapper<Transaction>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<Transaction, Transaction> itemProcessor() {
        return new CustomItemProcessor();
    }

    @Bean
    public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
      throws MalformedURLException {
        StaxEventItemWriter<Transaction> itemWriter =
          new StaxEventItemWriter<Transaction>();
        itemWriter.setMarshaller(marshaller);
        itemWriter.setRootTagName("transactionRecord");
        itemWriter.setResource(outputXml);
        return itemWriter;
    }

    @Bean
    public Marshaller marshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(new Class[]{ Transaction.class });
        return marshaller;
    }

    @Bean
    protected Step step1(ItemReader<Transaction> reader,
      ItemProcessor<Transaction, Transaction> processor,
      ItemWriter<Transaction> writer) {
        return steps.get("step1").<Transaction, Transaction> chunk(10)
          .reader(reader).processor(processor).writer(writer).build();
    }

    @Bean(name = "firstBatchJob")
    public Job job(@Qualifier("step1") Step step1) {
        return jobs.get("firstBatchJob").start(step1).build();
    }
}

それでは、全体の設定が完了したので、それを細分化して説明しましょう。


5.1.

ItemReader


を使用してデータを読み取り、オブジェクトを作成する

最初に

cvsFileItemReader

を設定して、

record.csv

からデータを読み取り、それを

Transaction

オブジェクトに変換します。

@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
    private String username;
    private int userId;
    private Date transactionDate;
    private double amount;

   /**  getters and setters for the attributes ** /
    @Override
    public String toString() {
        return "Transaction[username=" + username + ", userId=" + userId
          + ", transactionDate=" + transactionDate + ", amount=" + amount
          + "]";
    }
}

そうするために – それはカスタムマッパーを使います:

public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
        Transaction transaction = new Transaction();

        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt(1));
        transaction.setAmount(fieldSet.readDouble(3));
        String dateString = fieldSet.readString(2);
        try {
            transaction.setTransactionDate(dateFormat.parse(dateString));
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return transaction;
    }
}


5.2.

ItemProcessor


によるデータ処理

独自のアイテムプロセッサ

CustomItemProcessor

を作成しました。これはトランザクションオブジェクトに関連するものは何も処理しません – それはリーダーからライターに来る元のオブジェクトを渡すだけです。

public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {

    public Transaction process(Transaction item) {
        return item;
    }
}


5.3.

ItemWriter


を使用してFSにオブジェクトを書き込む

最後に、この

transaction



xml/output.xml

にあるxmlファイルに格納します。

<bean id="itemWriter"
  class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" value="file:xml/output.xml"/>
    <property name="marshaller" ref="recordMarshaller"/>
    <property name="rootTagName" value="transactionRecord"/>
</bean>


5.4. バッチジョブの設定

そのため、

batch:job

構文を使用して、ドットとジョブを関連付けるだけです。


commit-interval

に注意してください。これは、バッチを

itemWriter

にコミットする前にメモリに保持されるトランザクションの数です。その時点まで(または入力データの終わりに達するまで)トランザクションをメモリに保持します。

<batch:job id="firstBatchJob">
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="itemProcessor" commit-interval="10">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>


5.5. バッチジョブの実行

これで終わりです – では、すべてをセットアップして実行しましょう。

public class App {
    public static void main(String[]args) {
       //Spring Java config
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(SpringConfig.class);
        context.register(SpringBatchConfig.class);
        context.refresh();

        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("firstBatchJob");
        System.out.println("Starting the batch job");
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Status : " + execution.getStatus());
            System.out.println("Job completed");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Job failed");
        }
    }
}


6. 結論

このチュートリアルでは、

Spring Batchの使い方

と単純なユースケースでの使い方の基本的な考え方を説明します。

バッチ処理パイプラインを簡単に開発する方法、および読み取り、処理、書き込みのさまざまな段階をカスタマイズする方法を示します。

このチュートリアルの

完全な実装

はhttps://github.com/eugenp/tutorials/tree/master/spring-batch[githubプロジェクト]にあります – これはEclipseベースのプロジェクトなので、簡単にできます。そのままインポートして実行します。