Java Parallel Collectorsライブラリのガイド

1. 前書き

https://github.com/pivovarit/parallel-collectors[Parallel-collectors]は、並列処理を可能にするJava Stream APIコレクターのセットを提供する小さなライブラリであり、同時に標準のParallel Streamsの主な欠陥を回避します。 。

2. Mavenの依存関係

ライブラリの使用を開始する場合は、Mavenの_pom.xml_ファイルに単一のエントリを追加する必要があります。
<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>1.1.0</version>
</dependency>
または、Gradleのビルドファイルの1行:
compile 'com.pivovarit:parallel-collectors:1.1.0'
最新バージョンhttps://search.maven.org/search?q=g:com.pivovarit%20AND%20a:parallel-collectors

3. 並列ストリームの警告

Parallel StreamsはJava 8のハイライトの1つでしたが、重いCPU処理のみに適用できることが判明しました。
その理由は、*並列ストリームが、JVM全体の共有_ForkJoinPool_によって内部的に支援されていたためです。
たとえば、idのリストがあり、それらを使用してユーザーのリストを取得し、この操作に負荷がかかることを想像してください。
そのためにParallel Streamsを使用できます。
List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.parallelStream()
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList());

System.out.println(results); // [user-1, user-2, user-3]
そして確かに、顕著な高速化があることがわかります。 しかし、複数の並列ブロッキング操作を並行して実行し始めると問題が生じます。 *これにより、プールがすぐに飽和状態になる可能性があり*、潜在的に巨大なレイテンシーが発生します。 そのため、独立したスレッドプールを作成してバルクヘッドを構築し、無関係なタスクが相互の実行に影響を与えないようにすることが重要です。
custom _ForkJoinPool_インスタンスを提供するために、https://www.baeldung.com/java-8-parallel-streams-custom-threadpool [ここで説明するトリック]を活用できますが、このアプローチは文書化されていないハックに依存しており、 JDK10までは問題があります。 問題自体の詳細については、https://bugs.openjdk.java.net/browse/JDK-8190974 [[JDK8190974]]を参照してください。

4. アクションの並列コレクター

*名前が示すように、パラレルコレクターは、_collect()_フェーズで追加の操作を並行して実行できる標準のストリームAPIコレクターです。*
_ParallelCollectors_(_Collectors_クラスをミラーリングする)クラスは、ライブラリのすべての機能へのアクセスを提供するファサードです。
上記の例をやり直したい場合は、次のように書くことができます。
ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]
結果は同じですが、*カスタムスレッドプールを提供し、カスタム並列処理レベルを指定することができ、結果はlink:/java-completablefuture[_CompletableFuture_]インスタンスにラップされて到着しました現在のスレッドをブロックします。 *
一方、標準パラレルストリームでは、これらのいずれも達成できませんでした。

4.1. ParallelCollectors.parallelToList / ToSet()

直感的に理解できるように、_Stream_を並列処理して結果を_List_または_Set_に収集する場合は、_ParallelCollectors.parallelToList_または_parallelToSet_を使用するだけです。
List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
  .collect(parallelToList(i -> fetchById(i), executor, 4))
  .join();

4.2. ParallelCollectors.parallelToMap()

Stream APIと同様に、_Stream_要素を_Map_インスタンスに収集する場合は、2つのマッパーを提供する必要があります。
List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
  .join(); // {1=user-1, 2=user-2, 3=user-3}
カスタム_Map_インスタンス_Supplier_を提供することもできます。
Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
  .join();
そして、カスタムの競合解決戦略:
List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
  .join();

4.3. ParallelCollectors.parallelToCollection()

上記と同様に、カスタムコンテナにパッケージ化された結果を取得する場合は、カスタム_Collection Supplier_を渡すことができます。
List<String> results = ids.stream()
  .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
  .join();

4.4. ParallelCollectors.parallelToStream()

上記では不十分な場合、実際に_Stream_インスタンスを取得して、そこでカスタム処理を続行できます。
Map<Integer, List<String>> results = ids.stream()
  .collect(parallelToStream(i -> fetchById(i), executor, 4))
  .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
  .join();

4.5. ParallelCollectors.parallel()

これにより、結果を完了順にストリーミングできます。
ids.stream()
  .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-3
// user-2
この場合、ランダムな処理遅延が発生するため、コレクターは毎回異なる結果を返すことが予想されます。

4.6. ParallelCollectors.parallelOrdered()

この機能により、上記と同様にストリーミング結果が可能になりますが、元の順序は維持されます。
ids.stream()
  .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-2
// user-3
この場合、コレクターは常に順序を維持しますが、上記よりも遅くなる可能性があります。

5. 制限事項

執筆時点では、*並列コレクターは、短絡操作が使用されている場合でも、無限ストリーム*では動作しません。これは、Stream API内部によって課される設計上の制限です。 簡単に言うと、__ Stream__sはコレクターを非短絡操作として扱うため、ストリームはすべてのアップストリーム要素を処理してから終了する必要があります。
他の制限は、*短絡操作は、短絡後に残りのタスクを中断しない*ことです。

6. 結論

並列コレクターライブラリを使用して、カスタムJavaプールAPI _Collectors_および_CompletableFutures_を使用して並列処理を実行し、カスタムスレッドプール、並列処理、および_CompletableFuturesの非ブロックスタイルを使用する方法を確認しました。
いつものように、コードスニペットはhttps://github.com/eugenp/tutorials/tree/master/libraries-2[GitHubで]で入手できます。
詳細については、GitHubのhttps://github.com/pivovarit/parallel-collectors[parallel-collectorsライブラリ]、http://4comprehension.com [著者のブログ]、および著者のhttps:// twitterを参照してください。 com / pivovarit [Twitterアカウント]。