1前書き

この記事では、Javaのスレッドプールについて説明します。標準のJavaライブラリのさまざまな実装から始め、次にGoogleのGuavaライブラリを調べます。

** 2スレッドプール

**

Javaでは、スレッドはオペレーティングシステムのリソースであるシステムレベルのスレッドにマッピングされます。あなたが手に負えないほどにスレッドを作成すると、あなたはすぐにこれらのリソースを使い果たすかもしれません。

並列処理をエミュレートするために、スレッド間のコンテキスト切り替えもオペレーティングシステムによって行われます。単純化した見方は、生成するスレッドが多いほど、各スレッドが実際の作業に費やす時間が短くなるということです。

スレッドプールパターンは、マルチスレッドアプリケーションのリソースを節約し、また事前定義済みの制限内に並列処理を抑えるのに役立ちます。

スレッドプールを使用するときは、並列タスクの形式で並行コードを作成し、実行のためにそれらをスレッドプールのインスタンスに送信します。このインスタンスは、これらのタスクを実行するためにいくつかの再利用されたスレッドを制御します。リンク:/uploads/2016-08-10__10-16-52-1024×572.png%201024w[]

パターンを使用すると、

アプリケーションが作成しているスレッドの数

、それらのライフサイクルを制御したり、タスクの実行をスケジュールしたり、着信タスクをキューに入れたりすることができます。


3 Java

のスレッドプール


3.1.

Executors



Executor

、および

ExecutorService



Executors

ヘルパークラスには、事前設定されたスレッドプールインスタンスを作成するためのいくつかのメソッドがあります。これらのクラスは、最初から始めるのに適した場所です。カスタムの微調整を適用する必要がない場合に使用します。


Executor

および

ExecutorService

インターフェースは、Javaのさまざまなスレッドプール実装で機能するために使用されます。通常は、コードをスレッドプールの実際の実装から切り離し、アプリケーション全体でこれらのインタフェースを使用する必要があります。


Executor

インターフェースには、実行のために

Runnable

インスタンスを送信する単一の

execute

メソッドがあります。

  • これは、

    Executors

    APIを使用して、シングルスレッドプールとタスクを順次実行するための無制限のキューに裏付けされた

    Executor

    インスタンスを取得する方法の簡単な例です。ここでは、画面に「

    Hello World

    」と表示するだけの単一のタスクを実行します。タスクは

    Runnable

    であると推論されるラムダ(Java 8の機能)として送信されます。

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));


ExecutorService

インターフェースには、タスクの進行状況を制御し、サービスの終了を管理するための多数のメソッドが含まれています。このインタフェースを使用して、実行のためにタスクを送信し、返された

Future

インスタンスを使用してそれらの実行を制御することもできます。

  • 次の例では、

    ExecutorService

    を作成し、タスクを送信してから、返された

    Future

    s

    get

    メソッドを使用して、送信されたタスクが終了して値が返されるまで待機します。

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");//some operations
String result = future.get();

もちろん、実際のシナリオでは、通常

future.get()

をすぐに呼び出したくはありませんが、実際に計算の値が必要になるまで呼び出しを延期してください。


submit

メソッドは

Runnable

または

Callable

を取るためにオーバーロードされています。どちらも機能的なインターフェースで、ラムダとして渡すことができます(Java 8以降)。


Runnable

の単一メソッドは例外をスローせず、値を返しません。

Callable

インターフェースは、例外をスローして値を返すことを可能にするので、より便利かもしれません。

最後に – コンパイラが

Callable

型を推測できるようにするには、単にラムダから値を返します。


ExecutorService

インターフェースおよび将来の使用に関するその他の例については、「

Java ExecutorServiceの手引き

」を参照してください。


3.2.

ThreadPoolExecutor



ThreadPoolExecutor

は、微調整のための多くのパラメータとフックを備えた拡張可能なスレッドプールの実装です。

ここで説明する主な設定パラメータは次のとおりです。


  • corePoolSize





    maximumPoolSize


    、および


    keepAliveTime

    ** 。

プールは、常に内部に保持される固定数のコアスレッドと、不要になった時点で生成されて終了する可能性がある過剰なスレッドから構成されます。

corePoolSize

パラメーターは、インスタンス化されプールに保持されるコアスレッドの量です。すべてのコアスレッドがビジーで、さらにタスクがサブミットされた場合、プールは最大プールサイズにまで拡大することができます。


keepAliveTime

パラメータは、過剰なスレッド(つまり、

corePoolSize

を超えてインスタンス化されたスレッド)がアイドル状態で存在できる時間間隔です。

これらのパラメータは幅広いユースケースを網羅していますが、

最も一般的な設定は

Executors

静的メソッドで事前定義されています

  • 例えば** 、

    newFixedThreadPool

    メソッドは、等しい

    corePoolSize



    maximumPoolSize

    のパラメータ値と0の

    keepAliveTimeを持つ

    ThreadPoolExecutor__を作成します。これは、このスレッドプールのスレッド数が常に同じであることを意味します。

ThreadPoolExecutor executor =
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

上記の例では、固定スレッド数2で

ThreadPoolExecutor

をインスタンス化しています。これは、同時に実行されているタスクの数が常に2以下の場合、すぐに実行されることを意味します。さもなければ

これらのタスクのいくつかはそれらの順番を待つためにキューに入れられるかもしれません

1000ミリ秒間スリープすることで重い作業を模倣する3つの

Callable

タスクを作成しました。最初の2つのタスクは一度に実行され、3番目のタスクはキューで待機する必要があります。タスクを送信した直後に

getPoolSize()

および

getQueue()。size()

メソッドを呼び出すことで確認できます。


Executors.newCachedThreadPool()

メソッドを使用して、別の事前設定済みの

ThreadPoolExecutor

を作成できます。このメソッドはスレッド数をまったく受け取りません。このインスタンスでは、

corePoolSize

は実際には0に設定され、

maximumPoolSize



Integer.MAX

VALUE__に設定されています。

この場合の

keepAliveTime

は60秒です。

これらのパラメータ値は、** キャッシュされたスレッドプールは、送信されたタスクの量に対応するために、制限なしに拡張できることを意味します。しかし、スレッドが不要になった場合は、60秒間何も操作しないと破棄されます。典型的なユースケースは、アプリケーションに短期間のタスクがたくさんある場合です。

ThreadPoolExecutor executor =
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

上記の例のキューサイズは、内部的に

SynchronousQueue

インスタンスが使用されているため、常にゼロになります。

SynchronousQueue

では、

insert

および

remove

操作のペアは常に同時に発生するため、キューには実際には何も含まれません。


Executors.newSingleThreadExecutor()

APIは、単一スレッドを含む

ThreadPoolExecutorのもう1つの典型的な形式を作成します。

シングルスレッドエグゼキュータは、イベントループを作成するのに理想的です


corePoolSize

および

maximumPoolSize

パラメータは1に等しく、

keepAliveTime__はゼロです。

上記の例のタスクは順番に実行されるので、タスクの完了後はflagの値は2になります。

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

さらに、この

ThreadPoolExecutor

は不変のラッパーで装飾されているため、作成後に再設定することはできません。これも

ThreadPoolExecutor

にキャストできない理由です。


3.3.

ScheduledThreadPoolExecutor



ScheduledThreadPoolExecutor

は、

ThreadPoolExecutor

クラスを拡張し、いくつかの追加メソッドで

ScheduledExecutorService

インターフェースも実装します。


  • schedule

    メソッドは指定された後に一度だけタスクを実行することを可能にします

ディレイ;
**

scheduleAtFixedRate

メソッドでは、タスクの実行後にタスクを実行できます。

初期遅延を指定してから、一定の時間をかけて繰り返し実行します。
期間;

period

引数は、開始から開始までの時間

です。
タスクの時間

なので、実行率は固定です。
**

scheduleWithFixedDelay

メソッドは、

scheduleAtFixedRate

メソッドと似ています。

それは与えられたタスクを繰り返し実行するが、指定された遅延は

前のタスクの終わりから次のタスクの始めまでの間に測定される

実行速度は、特定のタスクの実行にかかる時間によって異なります。


Executors.newScheduledThreadPool()

メソッドは通常、指定された

corePoolSize

、無制限の

maximumPoolSize

、およびゼロの

keepAliveTime

を使用して

ScheduledThreadPoolExecutor

を作成するために使用されます。 500ミリ秒でタスクの実行をスケジュールする方法は次のとおりです。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

次のコードは、500ミリ秒後にタスクを実行してから100ミリ秒ごとに繰り返す方法を示しています。タスクをスケジュールした後、

CountDownLatch

lock

__を使用してタスクが3回起動されるまで待機し、次に

Future.cancel()__メソッドを使用してタスクをキャンセルします。

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);


3.4.

フォークジョインプール



ForkJoinPool

は、Java 7で導入された

fork/join

フレームワークの中心部分です。これは、

再帰的アルゴリズムで複数のタスクを生成する

という一般的な問題を解決します。単純な

ThreadPoolExecutor

を使用すると、すべてのタスクまたはサブタスクに独自のスレッドを実行する必要があるため、スレッドがすぐになくなります。


fork/join

フレームワークでは、どのタスクでもいくつかのサブタスクを生成(

fork

)し、

join

メソッドを使用してそれらの完了を待つことができます。

fork/join

フレームワークの利点は、各タスクまたはサブタスクに対して

新しいスレッドを作成しないこと

であり、代わりにWork Stealingアルゴリズムを実装することです。このフレームワークは、記事「

JavaにおけるFork/Joinフレームワークのガイド

」で詳しく説明されています。


ForkJoinPool

を使用してノードのツリーをたどり、すべてのリーフ値の合計を計算する簡単な例を見てみましょう。これは、ノード、

int

値、および子ノードのセットで構成されるツリーの簡単な実装です。

static class TreeNode {

    int value;

    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

ツリー内のすべての値を並列に合計したい場合は、

RecursiveTask <Integer>

インターフェースを実装する必要があります。各タスクはそれ自身のノードを受け取り、その値をその

children

の値の合計に追加します。

children

値の合計を計算するために、タスク実装は以下を行います。

  • 子供セットをストリーミングする

  • このストリームをマップし、各要素に対して新しい

    CountingTask

    を作成します。

  • 各サブタスクをフォークして実行します。

  • 分岐した各タスクで

    join

    メソッドを呼び出して結果を収集します。


  • Collectors.summingInt

    コレクターを使用して結果を合計します。

public static class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

実際のツリーで計算を実行するためのコードはとても単純です。

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Guavaでのスレッドプールの実装


Guava

は人気のあるGoogleのユーティリティライブラリです。

ExecutorService

のいくつかの便利な実装を含む、多くの便利な同時実行クラスがあります。実装クラスは直接のインスタンス化やサブクラス化にはアクセスできないので、それらのインスタンスを作成する唯一のエントリポイントは

MoreExecutors

ヘルパークラスです。


4.1. Mavenの依存関係としてのGuavaの追加

プロジェクトにGuavaライブラリを含めるには、Maven pomファイルに次の依存関係を追加します。

https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22com.google.guava%22%20AND%20a%3A%22guava%にGuavaライブラリの最新バージョンがあります。

22[Maven Central]リポジトリ:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>19.0</version>
</dependency>


4.2. 直接執行者および直接執行者サービス

状況に応じて、現在のスレッドまたはスレッドプールのいずれかでタスクを実行したいことがあります。単一の

Executor

インターフェースを使用して、実装を切り替えることをお勧めします。

現在のスレッドでタスクを実行する

Executor

または

ExecutorService

の実装を思いつくことはそれほど難しくありませんが、それでもまだいくつかの定型コードを書く必要があります。

喜んで、Guavaは私たちに事前定義されたインスタンスを提供します。

  • これは、同じスレッドでタスクを実行する例です。提供されたタスクは500ミリ秒間スリープしますが、現在のスレッドを** ブロックし、

    execute

    呼び出しが終了した直後に結果が利用可能になります。

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());


directExecutor()

メソッドによって返されるインスタンスは、実際には静的なシングルトンなので、このメソッドを使用しても、オブジェクト作成にオーバーヘッドはまったく発生しません。

このAPIは

MoreExecutors.newDirectExecutorService()

よりも使用する必要があります。これは、そのAPIが呼び出しごとに本格的なexecutorサービスの実装を作成するためです。


4.3. エグゼキュータサービスの終了

もう1つの一般的な問題は、スレッドプールがまだそのタスクを実行している間に、仮想マシンをシャットダウンすることです。キャンセルメカニズムが設定されていても、executorサービスがシャットダウンしたときにタスクがうまく動作して作業を停止するという保証はありません。これにより、タスクが作業を続けている間、JVMが無期限にハングすることがあります。

この問題を解決するために、Guavaは既存のexecutorサービスのファミリーを導入しました。これらはJVM

と一緒に終了する

デーモンスレッドに基づいています。

これらのサービスは、

Runtime.getRuntime()。addShutdownHook()

メソッドを使用してシャットダウンフックを追加し、ハングアップしたタスクをあきらめる前にVMが構成された時間内に終了しないようにします。

次の例では、無限ループを含むタスクを送信していますが、VMの終了時にタスクを待機するために、100ミリ秒の設定時間で既存のexecutorサービスを使用します。


exitingExecutorService

が設定されていないと、このタスクによってVMが無期限にハングします。

ThreadPoolExecutor executor =
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =
  MoreExecutors.getExitingExecutorService(executor,
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});


4.4. リスニングデコレータ

リスニングデコレータを使用すると、タスクの送信時に、単純な

Future

インスタンスの代わりに

ExecutorService

をラップして

ListenableFuture

インスタンスを受け取ることができます。

ListenableFuture

インターフェースは

Future

を拡張し、単一の追加メソッド

addListener

を持ちます。このメソッドにより、将来の完了時に呼び出されるリスナーを追加できます。


ListenableFuture.addListener()

メソッドを直接使用することはめったにありませんが、

Futures

ユーティリティクラスのほとんどのヘルパーメソッドには不可欠です。たとえば、

Futures.allAsList()

メソッドを使用すると、結合されたすべての未来が正常に完了したときに完了する、複数の

ListenableFuture

インスタンスを単一の

ListenableFuture

に結合できます。

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService =
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 =
  listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 =
  listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);


5結論

この記事では、標準のJavaライブラリとGoogleのGuavaライブラリにおけるスレッドプールパターンとその実装について説明しました。

この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。