スプリングバッチ分割、幅= 500、高さ= 400

__フォトクレジット :

Spring Batchでは、「Partitioning」は「それぞれの範囲のデータを処理する複数のスレッド」です。たとえば、1つのテーブルに100個のレコードがあり、「1次ID」が1から100まで割り当てられており、100レコード全体を処理したいとします。

通常、プロセスは1から100まで開始されます.1つのスレッドの例です。プロセスは10分かかると見積もられます。

Single Thread - Process from 1 to 100

「パーティショニング」では、それぞれ10個のレコードを処理するために10個のスレッドを開始できます( ‘id’の範囲に基づいて)。今、プロセスは完了するのにわずか1分かかります。

Thread 1 - Process from 1 to 10
Thread 2 - Process from 11 to 20
Thread 3 - Process from 21 to 30
......

スレッド9  -  81から90までのプロセス
スレッド10  -  91から100までのプロセス

「パーティショニング」手法を実装するには、構造体を理解する必要があります
処理する入力データの “データの範囲”を計画できるように、
正しく。

1.チュートリアル

このチュートリアルでは、 “Partitioner”ジョブを作成する方法、
これは10個のスレッドを持ち、各スレッドはデータベースからレコードを読み込み、
指定された ‘id’の範囲に基づいています。

使用されるツールとライブラリ

。 Maven 3
。 Eclipse 4.2
。 JDK 1.6
。 Spring Core 3.2.2.RELEASE
。 Spring Batch 2.2.0.RELEASE
。 MySQLのJavaドライバ5.1.25

__P.S “users”テーブルに100個のレコードがあると仮定します。

ユーザーの表構造

id、user__login、user__passs、age

1、user__1、pass__1,20
2、user__2、pass__2,40
3、user__3、pass__3,70
4、user__4、pass__4,5
5、user__5、pass__5,52
......
99、user__99、pass__99,89
100、user__100、pass__100,76

プロジェクトディレクトリ構造

標準のMavenプロジェクトである最終的なプロジェクト構造を見直してください。


spring-batch-partitioner-before、width = 451、height = 506

3.パーティショナー

まず、

Partitioner`実装を作成し、


partitioning range

‘を

ExecutionContext`に入れます。後で、バッチジョブのXMLファイルで `fromId`と

tied`を宣言します。

この場合、分割範囲は次のようになります。

Thread 1 = 1 - 10
Thread 2 = 11 - 20
Thread 3 = 21 - 30
......

スレッド10 = 91〜100

RangePartitioner.java

パッケージcom.mkyong.partition;

import java.util.HashMap; import java.util.Map;

インポートorg.springframework.batch.core.partition.support.Partitioner;インポートorg.springframework.batch.item.ExecutionContext;

パブリッククラスRangePartitionerはPartitioner {

@Overrideパブリックマップ<String、ExecutionContext> partition(int gridSize)

Map <String、ExecutionContext> result = newハッシュマップ<String、ExecutionContext>();

int range = 10; int fromId = 1; int toId = range;

for(int i = 1; i <= gridSize; i ++){ExecutionContext value = new ExecutionContext();

System.out.println( "\ nStarting:Thread" i); System.out.println( "fromId:" fromId); System.out.println( "toId:" toId);

value.putInt( "fromId"、fromId); value.putInt( "toId"、toId);

//give each thread a name, thread 1,2,3             value.putString("name", "Thread" + i);

result.put( "partition" i、value);

fromId = toId1; toId = range;

}

結果を返す。 }

}

4.バッチジョブ

バッチジョブのXMLファイルを見直してください。それは自明です。いくつかの点
ハイライトする:

。パーティショナーの場合、

grid-size =スレッド数


。 pagingItemReader Beanの場合、jdbcリーダーの例、

#{stepExecutionContext[fromId、toId]}`の値は、
rangePartitionerの `ExecutionContext`です。
。 itemProcessor beanの場合、 `#{stepExecutionContext[name]}`の値
rangePartitionerの `ExecutionContext`によって注入されます。
。ライターの場合、各スレッドは異なるcsvでレコードを出力します
ファイル、ファイル名形式 - `users.processed[fromId]} - [toId].csv

job-partitioner.xml

<?xml version = "1.0" encoding = "UTF-8"?> <beans xmlns = "http://www.springframework.org/schema/beans" xmlns:batch = "http://www.springframework.org/schema/batch "xmlns:util =" http://www.springframework.org/schema/util "xmlns:xsi =" http://www.w3.org/2001/XMLSchema-instance "xsi:schemaLocation =" http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http:///www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2 .xsd ">

<! -  spring batch core settings  - > <import resource = "../config/context.xml"/> <! - データベース設定 - > <import resource = "../config/database.xml"/>

<! -  partitioner job  - > <job id = "partitionJob" xmlns = "http://www.springframework.org/schema/batch"> <! - マスターステップ、10スレッド(グリッドサイズ) > <task id = "masterStep"> <partition step = "スレーブ" partitioner = "rangePartitioner"> <handler grid-size = "10" task-executor = "taskExecutor"/> </partition> </step>仕事>

<! - 各スレッドは、異なるstepExecutionContext値でこのジョブを実行します。 - > <id id = "slave" xmlns = "http://www.springframework.org/schema/batch"> <tasklet> <chunk reader = "pagingItemReader" writer = "flatFileItemWriter" processor = "itemProcessor" commit- interval = "1"/> </tasklet> </step>

<bean id = "rangePartitioner" class = "com.mkyong.partition.RangePartitioner"/>

<bean id = "taskExecutor" class = "org.springframework.core.task.SimpleAsyncTaskExecutor"/>

<! -  insert stepExecutionContext  - > <bean id = "itemProcessor" class = "com.mkyong.processor.UserProcessor" scope = "step"> <プロパティ名= "threadName" value = "#{stepExecutionContext[name]} "/> </bean>

<property id = "pagingItemReader" class = "org.springframework.batch.item.database.JdbcPagingItemReader" scope = "step"> <property name = "dataSource" ref = "dataSource"/> <property name = "queryProvider"> <bean class = "org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"> <property name = "dataSource" ref = "dataSource"/> <property name = "selectClause" value = "select id、user__login、user__pass <property name = "whereClause" value = "where id> =:fromIdおよびid <=:toId"/> <property name = " rangePartitionerのExecutionContext経由で注入 - > <property name = "parameterValues"> <map> <entry key = "fromId" value = "id"/> </bean> "#{stepExecutionContext[fromId]}"/> <entry key = "toId" value = "#{stepExecutionContext[toId]}"/> </map> </property> <property name = "pageSize" value = "1 0 "/> <property name =" rowMapper "> <bean class =" com.mkyong.UserRowMapper "/> </property> </bean>

<! -  csv file writer  - > <bean id = "flatFileItemWriter" class = "org.springframework.batch.item.file.FlatFileItemWriter" scope = "step"> <プロパティ名= "リソース"値= "ファイル: <property name = "appendAllowed" value = "false"/> <property name = "lineAggregator"> <csv/outputs/users.processed> {stepExecutionContext[fromId]<bean name = "org.springframework.batch.item.file.transform.DelimitedLineAggregator"> <property name = "区切り記号" value = "、"/> <property name = "fieldExtractor"> <bean class = "org.springframework </property> </property> </bean> </bean> </bean> </property> </bean> >

</beans>

アイテムプロセッサクラスは、処理アイテムを印刷するために使用され、
現在実行中の「スレッド名」のみ。

UserProcessor.java – アイテムプロセッサ

パッケージcom.mkyong.processor;

import org.springframework.batch.item.ItemProcessor; import com.mkyong.User;

パブリッククラスUserProcessorは、ItemProcessor <User、User> {

プライベート文字列threadName;

@Override publicユーザプロセス(ユーザアイテム)は例外{

System.out.println(threadName "processing:" item.getId() ":" item.getUsername());

返却物; }

public String getThreadName(){returnスレッド名; }

public void setThreadName(String threadName){this.threadName = threadName; }

}

5.それを実行する

すべてをロードして実行する…​ 10スレッドが処理を開始する
データの提供範囲。

パッケージcom.mkyong;

import org.springframework.batch.core.Job;インポートorg.springframework.batch.core.JobExecution; org.springframework.batch.core.JobParametersをインポートします。 org.springframework.batch.core.JobParametersBuilderをインポートします。 org.springframework.batch.core.launch.JobLauncherをインポートします。インポートorg.springframework.context.ApplicationContext;インポートorg.springframework.context.support.ClassPathXmlApplicationContext;

パブリッククラスPartitionApp {

パブリックstatic void main(String[]args){PartitionApp obj = new PartitionApp(); obj.runTest(); }

プライベートvoid runTest(){

String[]springConfig = {"spring/batch/jobs/job-partitioner.xml"};

ApplicationContext context =新しいClassPathXmlApplicationContext(springConfig);

JobLauncher jobLauncher =(JobLauncher)context.getBean( "jobLauncher");ジョブジョブ=(ジョブ)context.getBean( "partitionJob");

試して{

ジョブ実行の実行= jobLauncher.run(job、new JobParameters()); System.out.println( "終了ステータス:" execution.getStatus()); System.out.println( "終了ステータス:" execution.getAllFailureExceptions());

} catch(例外e){e.printStackTrace();} }

System.out.println( "Done");

}
}

コンソール出力

開始:Thread1 from Id:1 to Id:10

開始:Thread2 from Id:11 to Id:20

開始:Thread3 from Id:21 to Id:30

開始:Thread4 from Id:31 to Id:40

開始:Thread5 fromId:41 to Id:50

開始:スレッド6 fromId:51からId:60

開始:スレッド7 fromId:61からId:70

開始:Thread8 from Id:71 to Id:80

開始:Thread9 fromId:81 to Id:90

開始:Thread10 from Id:91 to Id:100

スレッド8の処理:71:user__71
スレッド2処理:11:user__11
スレッド3処理:21:user__21
スレッド10の処理:91:user__91
スレッド4の処理:31:user__31
スレッド6の処理:51:user__51
スレッド5の処理:41:user__41
スレッド1の処理:1:user__1
スレッド9の処理:81:user__81
スレッド7の処理:61:user__61
スレッド2処理:12:user__12
スレッド7の処理:62:user__62
スレッド6の処理:52:user__52
スレッド1の処理:2:user__2
スレッド9の処理:82:user__82
......

処理が完了すると、10個のCSVファイルが作成されます。


spring-batch-partitioner-after、width = 435、height = 470

users.processed1-10.csv

1,user__1,pass__1,20
2,user__2,pass__2,40
3,user__3,pass__3,70
4,user__4,pass__4,5
5,user__5,pass__5,52
6,user__6,pass__6,69
7,user__7,pass__7,48
8,user__8,pass__8,34
9,user__9,pass__9,62
10,user__10,pass__10,21

6.その他

6.1また、アノテーションを介して `#{stepExecutionContext[name]}`を挿入することもできます。

UserProcessor.java – 注釈バージョン

package com.mkyong.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.mkyong.User;

@Component("itemProcessor")
@Scope(value = "step")
public class UserProcessor implements ItemProcessor<User, User> {

    @Value("#{stepExecutionContext[name]}")
    private String threadName;

    @Override
    public User process(User item) throws Exception {

        System.out.println(threadName + " processing : "
                     + item.getId() + " : " + item.getUsername());

        return item;
    }

}

忘れずに、Springコンポーネントの自動スキャンを有効にしてください。

    <context:component-scan base-package="com.mkyong"/>

6.2データベース・パーティション・リーダー – MongoDBの例

job-partitioner.xml

  <bean id="mongoItemReader" class="org.springframework.batch.item.data.MongoItemReader"
    scope="step">
    <property name="template" ref="mongoTemplate"/>
    <property name="targetType" value="com.mkyong.User"/>
    <property name="query"
      value="{
        'id':{$gt:#{stepExecutionContext[fromId]}, $lte:#{stepExecutionContext[toId]}
      } }"
   />
    <property name="sort">
        <util:map id="sort">
            <entry key="id" value=""/>
        </util:map>
    </property>
  </bean>

完了しました。

ソースコードをダウンロードする

ダウンロードする –

SpringBatch-Partitioner-Example.zip

(31 KB)

参考文献

JavaDoc]。

http://ja.wikipedia.org/wiki/Partition__(database

)[Wiki:データベース

パーティション]

リンク://タグ/パーティション/[パーティション]リンク://タグ/スプリングバッチ/[スプリングバッチ]

threads