Javaのスレッドプールの紹介
1前書き
この記事では、Javaのスレッドプールについて説明します。標準のJavaライブラリのさまざまな実装から始め、次にGoogleのGuavaライブラリを調べます。
** 2スレッドプール
**
Javaでは、スレッドはオペレーティングシステムのリソースであるシステムレベルのスレッドにマッピングされます。あなたが手に負えないほどにスレッドを作成すると、あなたはすぐにこれらのリソースを使い果たすかもしれません。
並列処理をエミュレートするために、スレッド間のコンテキスト切り替えもオペレーティングシステムによって行われます。単純化した見方は、生成するスレッドが多いほど、各スレッドが実際の作業に費やす時間が短くなるということです。
スレッドプールパターンは、マルチスレッドアプリケーションのリソースを節約し、また事前定義済みの制限内に並列処理を抑えるのに役立ちます。
スレッドプールを使用するときは、並列タスクの形式で並行コードを作成し、実行のためにそれらをスレッドプールのインスタンスに送信します。このインスタンスは、これらのタスクを実行するためにいくつかの再利用されたスレッドを制御します。リンク:/uploads/2016-08-10__10-16-52-1024×572.png%201024w[]
パターンを使用すると、
アプリケーションが作成しているスレッドの数
、それらのライフサイクルを制御したり、タスクの実行をスケジュールしたり、着信タスクをキューに入れたりすることができます。
3 Java
のスレッドプール
3.1.
Executors
、
Executor
、および
ExecutorService
Executors
ヘルパークラスには、事前設定されたスレッドプールインスタンスを作成するためのいくつかのメソッドがあります。これらのクラスは、最初から始めるのに適した場所です。カスタムの微調整を適用する必要がない場合に使用します。
Executor
および
ExecutorService
インターフェースは、Javaのさまざまなスレッドプール実装で機能するために使用されます。通常は、コードをスレッドプールの実際の実装から切り離し、アプリケーション全体でこれらのインタフェースを使用する必要があります。
Executor
インターフェースには、実行のために
Runnable
インスタンスを送信する単一の
execute
メソッドがあります。
-
これは、
Executors
APIを使用して、シングルスレッドプールとタスクを順次実行するための無制限のキューに裏付けされた
Executor
インスタンスを取得する方法の簡単な例です。ここでは、画面に「
Hello World
」と表示するだけの単一のタスクを実行します。タスクは
Runnable
であると推論されるラムダ(Java 8の機能)として送信されます。
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));
ExecutorService
インターフェースには、タスクの進行状況を制御し、サービスの終了を管理するための多数のメソッドが含まれています。このインタフェースを使用して、実行のためにタスクを送信し、返された
Future
インスタンスを使用してそれらの実行を制御することもできます。
-
次の例では、
ExecutorService
を作成し、タスクを送信してから、返された
Future
s
get
メソッドを使用して、送信されたタスクが終了して値が返されるまで待機します。
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");//some operations
String result = future.get();
もちろん、実際のシナリオでは、通常
future.get()
をすぐに呼び出したくはありませんが、実際に計算の値が必要になるまで呼び出しを延期してください。
submit
メソッドは
Runnable
または
Callable
を取るためにオーバーロードされています。どちらも機能的なインターフェースで、ラムダとして渡すことができます(Java 8以降)。
Runnable
の単一メソッドは例外をスローせず、値を返しません。
Callable
インターフェースは、例外をスローして値を返すことを可能にするので、より便利かもしれません。
最後に – コンパイラが
Callable
型を推測できるようにするには、単にラムダから値を返します。
ExecutorService
インターフェースおよび将来の使用に関するその他の例については、「
Java ExecutorServiceの手引き
」を参照してください。
3.2.
ThreadPoolExecutor
ThreadPoolExecutor
は、微調整のための多くのパラメータとフックを備えた拡張可能なスレッドプールの実装です。
ここで説明する主な設定パラメータは次のとおりです。
-
corePoolSize
、
maximumPoolSize
、および
keepAliveTime
** 。
プールは、常に内部に保持される固定数のコアスレッドと、不要になった時点で生成されて終了する可能性がある過剰なスレッドから構成されます。
corePoolSize
パラメーターは、インスタンス化されプールに保持されるコアスレッドの量です。すべてのコアスレッドがビジーで、さらにタスクがサブミットされた場合、プールは最大プールサイズにまで拡大することができます。
keepAliveTime
パラメータは、過剰なスレッド(つまり、
corePoolSize
を超えてインスタンス化されたスレッド)がアイドル状態で存在できる時間間隔です。
これらのパラメータは幅広いユースケースを網羅していますが、
最も一般的な設定は
Executors
静的メソッドで事前定義されています
。
-
例えば** 、
newFixedThreadPool
メソッドは、等しい
corePoolSize
と
maximumPoolSize
のパラメータ値と0の
keepAliveTimeを持つ
ThreadPoolExecutor__を作成します。これは、このスレッドプールのスレッド数が常に同じであることを意味します。
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());
上記の例では、固定スレッド数2で
ThreadPoolExecutor
をインスタンス化しています。これは、同時に実行されているタスクの数が常に2以下の場合、すぐに実行されることを意味します。さもなければ
これらのタスクのいくつかはそれらの順番を待つためにキューに入れられるかもしれません
。
1000ミリ秒間スリープすることで重い作業を模倣する3つの
Callable
タスクを作成しました。最初の2つのタスクは一度に実行され、3番目のタスクはキューで待機する必要があります。タスクを送信した直後に
getPoolSize()
および
getQueue()。size()
メソッドを呼び出すことで確認できます。
Executors.newCachedThreadPool()
メソッドを使用して、別の事前設定済みの
ThreadPoolExecutor
を作成できます。このメソッドはスレッド数をまったく受け取りません。このインスタンスでは、
corePoolSize
は実際には0に設定され、
maximumPoolSize
は
Integer.MAX
VALUE__に設定されています。
この場合の
keepAliveTime
は60秒です。
これらのパラメータ値は、** キャッシュされたスレッドプールは、送信されたタスクの量に対応するために、制限なしに拡張できることを意味します。しかし、スレッドが不要になった場合は、60秒間何も操作しないと破棄されます。典型的なユースケースは、アプリケーションに短期間のタスクがたくさんある場合です。
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
executor.submit(() -> {
Thread.sleep(1000);
return null;
});
assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());
上記の例のキューサイズは、内部的に
SynchronousQueue
インスタンスが使用されているため、常にゼロになります。
SynchronousQueue
では、
insert
および
remove
操作のペアは常に同時に発生するため、キューには実際には何も含まれません。
Executors.newSingleThreadExecutor()
APIは、単一スレッドを含む
ThreadPoolExecutorのもう1つの典型的な形式を作成します。
シングルスレッドエグゼキュータは、イベントループを作成するのに理想的です
corePoolSize
および
maximumPoolSize
パラメータは1に等しく、
keepAliveTime__はゼロです。
上記の例のタスクは順番に実行されるので、タスクの完了後はflagの値は2になります。
AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
counter.set(1);
});
executor.submit(() -> {
counter.compareAndSet(1, 2);
});
さらに、この
ThreadPoolExecutor
は不変のラッパーで装飾されているため、作成後に再設定することはできません。これも
ThreadPoolExecutor
にキャストできない理由です。
3.3.
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
は、
ThreadPoolExecutor
クラスを拡張し、いくつかの追加メソッドで
ScheduledExecutorService
インターフェースも実装します。
-
schedule
メソッドは指定された後に一度だけタスクを実行することを可能にします
ディレイ;
**
scheduleAtFixedRate
メソッドでは、タスクの実行後にタスクを実行できます。
初期遅延を指定してから、一定の時間をかけて繰り返し実行します。
期間;
period
引数は、開始から開始までの時間
です。
タスクの時間
なので、実行率は固定です。
**
scheduleWithFixedDelay
メソッドは、
scheduleAtFixedRate
メソッドと似ています。
それは与えられたタスクを繰り返し実行するが、指定された遅延は
前のタスクの終わりから次のタスクの始めまでの間に測定される
実行速度は、特定のタスクの実行にかかる時間によって異なります。
Executors.newScheduledThreadPool()
メソッドは通常、指定された
corePoolSize
、無制限の
maximumPoolSize
、およびゼロの
keepAliveTime
を使用して
ScheduledThreadPoolExecutor
を作成するために使用されます。 500ミリ秒でタスクの実行をスケジュールする方法は次のとおりです。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);
次のコードは、500ミリ秒後にタスクを実行してから100ミリ秒ごとに繰り返す方法を示しています。タスクをスケジュールした後、
CountDownLatch
lock
__を使用してタスクが3回起動されるまで待機し、次に
Future.cancel()__メソッドを使用してタスクをキャンセルします。
CountDownLatch lock = new CountDownLatch(3);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
System.out.println("Hello World");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);
3.4.
フォークジョインプール
ForkJoinPool
は、Java 7で導入された
fork/join
フレームワークの中心部分です。これは、
再帰的アルゴリズムで複数のタスクを生成する
という一般的な問題を解決します。単純な
ThreadPoolExecutor
を使用すると、すべてのタスクまたはサブタスクに独自のスレッドを実行する必要があるため、スレッドがすぐになくなります。
fork/join
フレームワークでは、どのタスクでもいくつかのサブタスクを生成(
fork
)し、
join
メソッドを使用してそれらの完了を待つことができます。
fork/join
フレームワークの利点は、各タスクまたはサブタスクに対して
新しいスレッドを作成しないこと
であり、代わりにWork Stealingアルゴリズムを実装することです。このフレームワークは、記事「
JavaにおけるFork/Joinフレームワークのガイド
」で詳しく説明されています。
ForkJoinPool
を使用してノードのツリーをたどり、すべてのリーフ値の合計を計算する簡単な例を見てみましょう。これは、ノード、
int
値、および子ノードのセットで構成されるツリーの簡単な実装です。
static class TreeNode {
int value;
Set<TreeNode> children;
TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}
ツリー内のすべての値を並列に合計したい場合は、
RecursiveTask <Integer>
インターフェースを実装する必要があります。各タスクはそれ自身のノードを受け取り、その値をその
children
の値の合計に追加します。
children
値の合計を計算するために、タスク実装は以下を行います。
-
子供セットをストリーミングする
-
このストリームをマップし、各要素に対して新しい
CountingTask
を作成します。 -
各サブタスクをフォークして実行します。
-
分岐した各タスクで
join
メソッドを呼び出して結果を収集します。 -
Collectors.summingInt
コレクターを使用して結果を合計します。
public static class CountingTask extends RecursiveTask<Integer> {
private final TreeNode node;
public CountingTask(TreeNode node) {
this.node = node;
}
@Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork())
.collect(Collectors.summingInt(ForkJoinTask::join));
}
}
実際のツリーで計算を実行するためのコードはとても単純です。
TreeNode tree = new TreeNode(5,
new TreeNode(3), new TreeNode(2,
new TreeNode(2), new TreeNode(8)));
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
4. Guavaでのスレッドプールの実装
Guava
は人気のあるGoogleのユーティリティライブラリです。
ExecutorService
のいくつかの便利な実装を含む、多くの便利な同時実行クラスがあります。実装クラスは直接のインスタンス化やサブクラス化にはアクセスできないので、それらのインスタンスを作成する唯一のエントリポイントは
MoreExecutors
ヘルパークラスです。
4.1. Mavenの依存関係としてのGuavaの追加
プロジェクトにGuavaライブラリを含めるには、Maven pomファイルに次の依存関係を追加します。
https://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22com.google.guava%22%20AND%20a%3A%22guava%にGuavaライブラリの最新バージョンがあります。
22[Maven Central]リポジトリ:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
4.2. 直接執行者および直接執行者サービス
状況に応じて、現在のスレッドまたはスレッドプールのいずれかでタスクを実行したいことがあります。単一の
Executor
インターフェースを使用して、実装を切り替えることをお勧めします。
現在のスレッドでタスクを実行する
Executor
または
ExecutorService
の実装を思いつくことはそれほど難しくありませんが、それでもまだいくつかの定型コードを書く必要があります。
喜んで、Guavaは私たちに事前定義されたインスタンスを提供します。
-
これは、同じスレッドでタスクを実行する例です。提供されたタスクは500ミリ秒間スリープしますが、現在のスレッドを** ブロックし、
execute
呼び出しが終了した直後に結果が利用可能になります。
Executor executor = MoreExecutors.directExecutor();
AtomicBoolean executed = new AtomicBoolean();
executor.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
executed.set(true);
});
assertTrue(executed.get());
directExecutor()
メソッドによって返されるインスタンスは、実際には静的なシングルトンなので、このメソッドを使用しても、オブジェクト作成にオーバーヘッドはまったく発生しません。
このAPIは
MoreExecutors.newDirectExecutorService()
よりも使用する必要があります。これは、そのAPIが呼び出しごとに本格的なexecutorサービスの実装を作成するためです。
4.3. エグゼキュータサービスの終了
もう1つの一般的な問題は、スレッドプールがまだそのタスクを実行している間に、仮想マシンをシャットダウンすることです。キャンセルメカニズムが設定されていても、executorサービスがシャットダウンしたときにタスクがうまく動作して作業を停止するという保証はありません。これにより、タスクが作業を続けている間、JVMが無期限にハングすることがあります。
この問題を解決するために、Guavaは既存のexecutorサービスのファミリーを導入しました。これらはJVM
と一緒に終了する
デーモンスレッドに基づいています。
これらのサービスは、
Runtime.getRuntime()。addShutdownHook()
メソッドを使用してシャットダウンフックを追加し、ハングアップしたタスクをあきらめる前にVMが構成された時間内に終了しないようにします。
次の例では、無限ループを含むタスクを送信していますが、VMの終了時にタスクを待機するために、100ミリ秒の設定時間で既存のexecutorサービスを使用します。
exitingExecutorService
が設定されていないと、このタスクによってVMが無期限にハングします。
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(executor,
100, TimeUnit.MILLISECONDS);
executorService.submit(() -> {
while (true) {
}
});
4.4. リスニングデコレータ
リスニングデコレータを使用すると、タスクの送信時に、単純な
Future
インスタンスの代わりに
ExecutorService
をラップして
ListenableFuture
インスタンスを受け取ることができます。
ListenableFuture
インターフェースは
Future
を拡張し、単一の追加メソッド
addListener
を持ちます。このメソッドにより、将来の完了時に呼び出されるリスナーを追加できます。
ListenableFuture.addListener()
メソッドを直接使用することはめったにありませんが、
Futures
ユーティリティクラスのほとんどのヘルパーメソッドには不可欠です。たとえば、
Futures.allAsList()
メソッドを使用すると、結合されたすべての未来が正常に完了したときに完了する、複数の
ListenableFuture
インスタンスを単一の
ListenableFuture
に結合できます。
ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService =
MoreExecutors.listeningDecorator(executorService);
ListenableFuture<String> future1 =
listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 =
listeningExecutorService.submit(() -> "World");
String greeting = Futures.allAsList(future1, future2).get()
.stream()
.collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);
5結論
この記事では、標準のJavaライブラリとGoogleのGuavaライブラリにおけるスレッドプールパターンとその実装について説明しました。
この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。