1. 概要

Java 8は、データに対して一括操作を実行する効率的な方法として、S treamsの概念を導入しました。 また、並行性をサポートする環境では、並列ストリームを取得できます。

これらのストリームは、マルチスレッドのオーバーヘッドを犠牲にして、パフォーマンスを向上させることができます。

このクイックチュートリアルでは、 Stream API の最大の制限の1つを確認し、カスタム ThreadPool インスタンスを使用して並列ストリームを機能させる方法を確認します– thisを処理するライブラリがあります。

2. パラレルストリーム

簡単な例から始めましょう– [X43X]CollectionタイプのいずれかでparallelStreamメソッドを呼び出すと、おそらく並列のStreamが返されます。

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

このようなStreamで発生するデフォルトの処理では、 ForkJoinPool.commonPool()、アプリケーション全体で共有されるスレッドプールが使用されます。

3. カスタムスレッドプール

ストリームを処理するときに、実際にカスタムThreadPoolを渡すことができます。

次の例では、並列StreamでカスタムThreadPoolを使用して、1から1,000,000までの長い値の合計を計算します。

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

並列処理レベルが4のForkJoinPoolコンストラクターを使用しました。 さまざまな環境に最適な値を決定するには、ある程度の実験が必要ですが、経験則としては、CPUのコア数に基づいて数を選択するだけです。

次に、並列 Stream のコンテンツを処理し、reduce呼び出しで合計しました。

この単純な例では、カスタムスレッドプールを使用することの完全な有用性を示していない可能性がありますが、ネットワークソースからのデータの処理など、共通のスレッドプールを長時間実行するタスクと結び付けたくない状況では利点が明らかになります。 –または、共通スレッドプールがアプリケーション内の他のコンポーネントによって使用されています。

上記のテストメソッドを実行すると、合格します。 ここまでは順調ですね。

ただし、テストメソッドと同じ方法で通常のメソッドで ForkJoinPool クラスをインスタンス化すると、OutOfMemoryErrorが発生する可能性があります。

次に、メモリリークの原因を詳しく見ていきましょう。

4. メモリリークに注意してください

前に説明したように、共通スレッドプールはデフォルトでアプリケーション全体で使用されます。 共通スレッドプールは静的ThreadPoolインスタンスです。

したがって、デフォルトのスレッドプールを使用しても、メモリリークは発生しません。

それでは、テスト方法を確認しましょう。 テストメソッドでは、次のオブジェクトを作成しました ForkJoinPool。 試験方法が終了したら、 customThreadPoolオブジェクトは逆参照されず、ガベージコレクションされません。代わりに、新しいタスクが割り当てられるのを待機します。

つまり、テストメソッドを呼び出すたびに、新しい customThreadPool オブジェクトが作成され、リリースされません。

この問題の修正は非常に簡単です。メソッドを実行した後、 shutdown customThreadPoolオブジェクトを実行します。

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. 結論

カスタムThreadPoolを使用して並列Streamを実行する方法を簡単に説明しました。 適切な環境で、並列処理レベルを適切に使用することで、特定の状況でパフォーマンスを向上させることができます。

custom ThreadPool を作成する場合は、メモリリークを回避するために、その shutdown()メソッドを呼び出すことを覚えておく必要があります。

この記事で参照されている完全なコードサンプルは、GitHubにあります。