Javaで並列ストリームを使用する場合
1. 概要
Java 8では、 Stream API が導入されました。これにより、コレクションをデータのストリームとして簡単に反復処理できます。 それもとても
より多くのコアで作業を分割する方が常に速いと思うかもしれません。 しかし、そうではないことがよくあります。
このチュートリアルでは、シーケンシャルストリームとパラレルストリームの違いについて説明します。 最初に、並列ストリームで使用されるデフォルトのフォーク結合プールを確認します。
また、メモリの局所性や分割/マージのコストなど、並列ストリームを使用した場合のパフォーマンスへの影響についても検討します。
最後に、シーケンシャルストリームをパラレルストリームに変換することが理にかなっている場合に推奨します。
2. Javaでのストリーム
Javaのstreamは、データソースの単なるラッパーであり、便利な方法でデータに対して一括操作を実行できます。
データを保存したり、基になるデータソースに変更を加えたりすることはありません。 むしろ、データパイプラインでの機能スタイルの操作のサポートを追加します。
2.1. シーケンシャルストリーム
デフォルトでは、
シーケンシャルストリームは、単一のスレッドを使用してパイプラインを処理します。
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
このシーケンシャルストリームの出力は予測可能です。 リスト要素は常に順序付けられた順序で印刷されます。
1 main
2 main
3 main
4 main
2.2. パラレルストリーム
Javaのストリームは、シーケンシャルからパラレルに簡単に変換できます。
これは、シーケンシャルストリームに並列メソッドを追加するか、コレクションのparallelStreamメソッドを使用してストリームを作成することで実現できます。
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
並列ストリームを使用すると、別々のコアでコードを並列に実行できます。 最終的な結果は、個々の結果の組み合わせです。
ただし、実行の順序は制御できません。 プログラムを実行するたびに変更される可能性があります。
4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main
3. フォーク-フレームワークに参加
並列ストリームは、fork-joinフレームワークとそのワーカースレッドの共通プールを利用します。
複数のスレッド間のタスク管理を処理するために、Java7のjava.util.concurrentにフォーク結合フレームワークが追加されました。
3.1. ソースの分割
フォークジョインフレームワークは、ワーカースレッド間でのソースデータの分割と、タスク完了時のコールバックの処理を担当します。
整数の合計を並列に計算する例を見てみましょう。
reduce メソッドを使用して、ゼロから開始する代わりに、開始合計に5を追加します。
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);
シーケンシャルストリームでは、この操作の結果は15になります。
ただし、 reduce 操作は並行して処理されるため、実際には5番目の数値がすべてのワーカースレッドで合計されます。
実際の結果は、共通のフォーク結合プールで使用されるスレッドの数によって異なる場合があります。
この問題を修正するには、並列ストリームの外部に番号5を追加する必要があります。
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);
したがって、どの操作を並行して実行できるかについて注意する必要があります。
3.2. 共通スレッドプール
共通プール内のスレッドの数は
ただし、APIでは、JVMパラメーターを渡すことで、使用するスレッドの数を指定できます。
-D java.util.concurrent.ForkJoinPool.common.parallelism=4
これはグローバルな設定であり、
3.3. カスタムスレッドプール
デフォルトの共通スレッドプールに加えて、カスタムスレッドプールで並列ストリームを実行することもできます。
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
() -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);
Oracleでは共通スレッドプールの使用が推奨されていることに注意してください。W eには、カスタムスレッドプールで並列ストリームを実行する非常に適切な理由があります。
4. パフォーマンスへの影響
並列処理は、複数のコアを完全に利用するのに役立つ場合があります。 ただし、複数のスレッドの管理、メモリの局所性、ソースの分割、および結果のマージのオーバーヘッドも考慮する必要があります。
4.1. オーバーヘッド
整数ストリームの例を見てみましょう。
シーケンシャルおよびパラレルリダクション操作でベンチマークを実行します。
IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);
この単純な合計の削減では、シーケンシャルストリームをパラレルストリームに変換すると、パフォーマンスが低下します。
Benchmark Mode Cnt Score Error Units
SplittingCosts.sourceSplittingIntStreamParallel avgt 25 35476,283 ± 204,446 ns/op
SplittingCosts.sourceSplittingIntStreamSequential avgt 25 68,274 ± 0,963 ns/op
この背後にある理由は、スレッド、ソース、および結果の管理のオーバーヘッドが、実際の作業を行うよりもコストのかかる操作である場合があるためです。
4.2. 分割コスト
データソースを均等に分割することは、並列実行を可能にするために必要なコストですが、一部のデータソースは他のデータソースよりも適切に分割されます。
ArrayListとLinkedListを使用してこれを示しましょう。
private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
arrayListOfNumbers.add(i);
linkedListOfNumbers.add(i);
});
}
次の2種類のリストで、順次および並列の削減操作でベンチマークを実行します。
arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);
私たちの結果は、シーケンシャルストリームをパラレルストリームに変換すると、ArrayListに対してのみパフォーマンス上の利点がもたらされることを示しています。
Benchmark Mode Cnt Score Error Units
DifferentSourceSplitting.differentSourceArrayListParallel avgt 25 2004849,711 ± 5289,437 ns/op
DifferentSourceSplitting.differentSourceArrayListSequential avgt 25 5437923,224 ± 37398,940 ns/op
DifferentSourceSplitting.differentSourceLinkedListParallel avgt 25 13561609,611 ± 275658,633 ns/op
DifferentSourceSplitting.differentSourceLinkedListSequential avgt 25 10664918,132 ± 254251,184 ns/op
この背後にある理由は、配列は安価で均等に分割できるが、LinkedListにはこれらのプロパティがないためです。 TreeMapとHashSetは、 LinkedList よりも優れていますが、配列ほどではありません。
4.3. マージコスト
並列計算のためにソースを分割するたびに、最終的に結果を結合することも確認する必要があります。
合計とグループ化を異なるマージ操作として、シーケンシャルストリームとパラレルストリームでベンチマークを実行してみましょう。
arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
arrayListOfNumbers.stream().collect(Collectors.toSet());
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet())
私たちの結果は、シーケンシャルストリームをパラレルストリームに変換すると、合計演算に対してのみパフォーマンス上の利点がもたらされることを示しています。
Benchmark Mode Cnt Score Error Units
MergingCosts.mergingCostsGroupingParallel avgt 25 135093312,675 ± 4195024,803 ns/op
MergingCosts.mergingCostsGroupingSequential avgt 25 70631711,489 ± 1517217,320 ns/op
MergingCosts.mergingCostsSumParallel avgt 25 2074483,821 ± 7520,402 ns/op
MergingCosts.mergingCostsSumSequential avgt 25 5509573,621 ± 60249,942 ns/op
マージ操作は、縮小や加算などの一部の操作では非常に安価ですが、セットやマップへのグループ化などのマージ操作は非常にコストがかかる可能性があります。
4.4. メモリの局所性
最近のコンピューターは、高度なマルチレベルキャッシュを使用して、頻繁に使用されるデータをプロセッサーの近くに保持します。 線形メモリアクセスパターンが検出されると、ハードウェアは、おそらくすぐに必要になると想定して、次のデータ行をプリフェッチします。
並列処理は、プロセッサコアを忙しくして有用な作業を行うことができる場合に、パフォーマンス上の利点をもたらします。 キャッシュミスを待つことは有用な作業ではないため、メモリ帯域幅を制限要因として考慮する必要があります。
2つの配列を使用してこれを示しましょう。1つはプリミティブ型を使用し、もう1つはオブジェクトデータ型を使用します。
private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
intArray[i-1] = i;
integerArray[i-1] = i;
});
}
2つのアレイでの順次および並列削減操作でベンチマークを実行します。
Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);
私たちの結果は、シーケンシャルストリームをパラレルストリームに変換すると、プリミティブの配列を使用するときにパフォーマンス上の利点がわずかに増えることを示しています。
Benchmark Mode Cnt Score Error Units
MemoryLocalityCosts.localityIntArrayParallel avgt 25 116247,787 ± 283,150 ns/op
MemoryLocalityCosts.localityIntArraySequential avgt 25 293142,385 ± 2526,892 ns/op
MemoryLocalityCosts.localityIntegerArrayParallel avgt 25 2153732,607 ± 16956,463 ns/op
MemoryLocalityCosts.localityIntegerArraySequential avgt 25 5134866,640 ± 148283,942 ns/op
プリミティブの配列は、Javaで可能な限り最高の局所性をもたらします。 一般に、データ構造内のポインターが多いほど、参照オブジェクトをフェッチするためにメモリにかかるプレッシャーが大きくなります。 複数のコアが同時にメモリからデータをフェッチするため、これは並列化に悪影響を与える可能性があります。
4.5. NQモデル
オラクルは、並列処理によってパフォーマンスが向上するかどうかを判断するのに役立つ単純なモデルを提示しました。 NQ モデルでは、 N はソースデータ要素の数を表し、Qはデータ要素ごとに実行される計算量を表します。
N * Q の積が大きいほど、並列化によってパフォーマンスが向上する可能性が高くなります。 数値の合計など、 Q が非常に小さい問題の場合、経験則では、Nは10,000より大きくする必要があります。 計算の数が増えると、並列処理によってパフォーマンスを向上させるために必要なデータサイズが減少します。
5. パラレルストリームを使用する場合
これまで見てきたように、並列ストリームを使用するときは、非常に配慮する必要があります。
並列処理は、特定のユースケースでパフォーマンス上の利点をもたらす可能性があります。 しかし、並列ストリームは、魔法のパフォーマンスブースターと見なすことはできません。 したがって、シーケンシャルストリームは、開発中のデフォルトとして引き続き使用する必要があります。
実際のパフォーマンス要件がある場合、シーケンシャルストリームをパラレルストリームに変換できます。これらの要件を考慮して、最初にパフォーマンス測定を実行し、可能な最適化戦略として並列処理を検討する必要があります。
大量のデータと要素ごとに実行される多くの計算は、並列処理が適切なオプションである可能性があることを示しています。
一方、少量のデータ、ソースの不均一な分割、コストのかかるマージ操作、およびメモリの局所性の低さは、並列実行の潜在的な問題を示しています。
6. 結論
この記事では、Javaのシーケンシャルストリームとパラレルストリームの違いについて説明しました。 並列ストリームはデフォルトのフォーク結合プールとそのワーカースレッドを利用することを学びました。
次に、並列ストリームが必ずしもパフォーマンス上の利点をもたらすとは限らないことを確認しました。 複数のスレッドの管理、メモリの局所性、ソースの分割、結果のマージのオーバーヘッドを考慮しました。 アレイは、可能な限り最高のローカリティをもたらし、安価で均等に分割できるため、並列実行に最適なデータソースであることがわかりました。
最後に、 NQ モデルを確認し、実際のパフォーマンス要件がある場合にのみ並列ストリームを使用することをお勧めします。
いつものように、ソースコードはGitHubでから入手できます。