1. 序章

Parallel-collectors は、並列処理を可能にすると同時に、標準の並列ストリームの主な欠陥を回避するJavaStreamAPIコレクターのセットを提供する小さなライブラリです。

2. Mavenの依存関係

ライブラリの使用を開始する場合は、Mavenのpom.xmlファイルに次の1つのエントリを追加する必要があります。

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>1.1.0</version>
</dependency>

または、Gradleのビルドファイルの1行:

compile 'com.pivovarit:parallel-collectors:1.1.0'

最新バージョンは、MavenCentralにあります。

3. パラレルストリームの警告

パラレルストリームはJava8のハイライトの1つでしたが、重いCPU処理にのみ適用できることが判明しました。

この理由は、 Parallel StreamsがJVM全体で共有されるForkJoinPoolによって内部的にサポートされ、限定された並列処理を提供し、単一のJVMインスタンスで実行されるすべてのParallelStreamsによって使用されたためです。

たとえば、IDのリストがあり、それらを使用してユーザーのリストを取得したいとします。この操作にはコストがかかります。

そのためにParallelStreamsを使用できます。

List<Integer> ids = Arrays.asList(1, 2, 3); 
List<String> results = ids.parallelStream() 
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList()); 

System.out.println(results); // [user-1, user-2, user-3]

そして確かに、顕著なスピードアップがあることがわかります。 しかし、複数の並列ブロッキング操作を並行して実行し始めると問題が発生します。 これにより、プールがすぐに飽和状態になり、潜在的に巨大なレイテンシが発生する可能性があります。 そのため、無関係なタスクが相互の実行に影響を与えないように、個別のスレッドプールを作成してバルクヘッドを構築することが重要です。

カスタムForkJoinPoolインスタンスを提供するために、ここで説明するトリックを活用できましたが、このアプローチは文書化されていないハックに依存し、JDK10までは失敗していました。 詳細については、この号[JDK8190974]をご覧ください。

4. 動作中の並列コレクター

並列コレクターは、その名前が示すように、 collect()フェーズで追加の操作を並列に実行できる標準のStreamAPIコレクターです。

ParallelCollectors Collectors クラスをミラーリングする)クラスは、ライブラリの全機能へのアクセスを提供するファサードです。

上記の例をやり直したい場合は、次のように書くことができます。

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

結果は同じですが、 カスタムスレッドプールを提供し、カスタム並列処理レベルを指定することができ、現在のスレッドをブロックすることなく、結果がCompletableFutureインスタンスにラップされて到着しました。 

一方、標準の並列ストリームでは、これらのいずれも実現できませんでした。

4.1. ParallelCollectors.parallelToList / ToSet()

直感的に理解できるように、 Stream を並行して処理し、結果をListまたはSetに収集する場合は、ParallelCollectorsを使用するだけです。 .parallelToListまたはparallelToSet

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
  .collect(parallelToList(i -> fetchById(i), executor, 4))
  .join();

4.2. ParallelCollectors.parallelToMap()

Stream APIの場合と同様に、Stream要素をMapインスタンスに収集する場合は、次の2つのマッパーを提供する必要があります。

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
  .join(); // {1=user-1, 2=user-2, 3=user-3}

カスタムMapインスタンスSupplierを提供することもできます。

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
  .join();

そして、カスタムの競合解決戦略:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
  .join();

4.3. ParallelCollectors.parallelToCollection()

上記と同様に、カスタムコンテナにパッケージ化された結果を取得する場合は、カスタムコレクションサプライヤを渡すことができます。

List<String> results = ids.stream()
  .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
  .join();

4.4. ParallelCollectors.parallelToStream()

上記では不十分な場合は、実際に Stream インスタンスを取得して、そこでカスタム処理を続行できます。

Map<Integer, List<String>> results = ids.stream()
  .collect(parallelToStream(i -> fetchById(i), executor, 4))
  .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
  .join();

4.5. ParallelCollectors.parallel()

これにより、結果を完了順にストリーミングできます。

ids.stream()
  .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-3
// user-2

この場合、ランダムな処理遅延を導入したため、コレクターは毎回異なる結果を返すことが期待できます。

4.6. ParallelCollectors.parallelOrdered()

この機能により、上記と同じように結果をストリーミングできますが、元の順序が維持されます。

ids.stream()
  .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-2 
// user-3 

この場合、コレクターは常に順序を維持しますが、上記よりも遅くなる可能性があります。

5. 制限事項

執筆時点では、並列コレクターは、短絡操作が使用されている場合でも、無限ストリームでは機能しません。これは、StreamAPI内部によって課せられる設計上の制限です。 簡単に言うと、 Stream はコレクターを非短絡操作として扱うため、ストリームは終了する前にすべてのアップストリーム要素を処理する必要があります。

もう1つの制限は、短絡操作は、短絡後に残りのタスクを中断しないことです。

6. 結論

並列コレクターライブラリを使用して、カスタムJava Stream API CollectorsおよびCompletableFuturesを使用して並列処理を実行し、カスタムスレッドプール、並列処理、および非ブロッキングスタイルの[ X239X]CompletableFutures。

いつものように、コードスニペットはGitHub利用できます。

詳細については、GitHubの parallel-collectorsライブラリ作成者のブログ、および作成者のTwitterアカウントを参照してください。