CompletableFutureへのガイド
1前書き
この記事は、Java 8 Concurrency APIの改良として導入された
CompletableFuture
クラスの機能と使用例についてのガイドです。
2 Java
での非同期計算
非同期計算は、考えるのが難しいです。通常、私たちはあらゆる計算を一連のステップとして考えたいと思います。しかし、非同期計算の場合、コールバックとして表されるアクションは、コード全体に分散しているか、または互いの内側に深くネストされている傾向があります。
いずれかの手順で発生する可能性があるエラーを処理する必要がある場合、状況はさらに悪化します。
非同期計算の結果として機能するためにJava 5では
Future
インタフェースが追加されましたが、これらの計算を組み合わせたり、起こりうるエラーを処理するためのメソッドはありませんでした。
-
Java 8では、
CompletableFuture
クラスが導入されました**
Future
インタフェースとともに、それはまた
CompletionStage
インタフェースを実装しました。このインタフェースは、他のステップと組み合わせることができる非同期計算ステップの規約を定義します。
CompletableFuture
は同時に、非同期計算ステップの作成、結合、実行、およびエラー処理のための、約50種類の異なるメソッドを持つビルディングブロックおよびフレームワークです。
そのような大規模なAPIは圧倒的になる可能性がありますが、これらは主にいくつかの明確で異なる使用例に分類されます。
3
CompletableFuture
を単純な
Future
として使用する
まず第一に、
CompletableFuture
クラスは
Future
インターフェースを実装しているので、
Future
実装として
使用できますが、追加の補完ロジック
があります。
たとえば、将来の結果を表すために引数なしのコンストラクタを使用してこのクラスのインスタンスを作成し、それをコンシューマに配布し、
complete
メソッドを使用して将来的に完成させることができます。この結果が提供されるまで、コンシューマは
get
メソッドを使用して現在のスレッドをブロックできます。
以下の例では、
CompletableFuture
インスタンスを作成してから別のスレッドで計算をスピンオフしてすぐに
Future
を返すメソッドがあります。
計算が完了すると、メソッドは結果を
complete
メソッドに提供することで
Future
を完成させます。
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
計算をスピンオフするには、記事リンクに記載されている
Executor
APIを使用します。/thread-pool-java-and-guava生スレッドを含む任意の並行性メカニズムまたはAPIと一緒に使用します。
calculateAsync
メソッドが
Future
インスタンスを返すことに注意してください。
結果をブロックする準備ができたら、単純にメソッドを呼び出し、
Future
インスタンスを受け取り、その上で
get
メソッドを呼び出します。
get
メソッドはいくつかのチェックされた例外、すなわち
ExecutionException
(計算中に発生した例外のカプセル化)と
InterruptedException
(メソッドを実行しているスレッドが中断されたことを示す例外)を投げます。
Future<String> completableFuture = calculateAsync();
//...
String result = completableFuture.get();
assertEquals("Hello", result);
-
すでに計算結果を知っている場合は、この計算結果を表す引数を付けてstatic
completedFuture
メソッドを使用できます。それから
Future
の
get
メソッドは決してブロックされず、代わりにこの結果を即座に返します。
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
//...
String result = completableFuture.get();
assertEquals("Hello", result);
別のシナリオとして、
Future
** の実行をキャンセルしたいかもしれません。
結果を見つけることができず、非同期実行を完全にキャンセルすることにしたとします。これは
Future
s
cancel
メソッドで行うことができます。このメソッドは
boolean
引数
mayInterruptIfRunning
を受け取りますが、
CompletableFuture
の場合、
CompletableFuture
の処理を制御するための割り込みは使用されないため、効果はありません。
これが非同期メソッドの修正版です。
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
Future.get()
メソッドを使用して結果をブロックするときに、将来がキャンセルされた場合は
CancellationException
がスローされます。
Future<String> future = calculateAsyncWithCancellation();
future.get();//CancellationException
4カプセル化計算ロジックを使った
CompletableFuture
上記のコードを使用すると、同時実行の任意のメカニズムを選択できますが、この定型句をスキップして単純に非同期でコードを実行する場合はどうなりますか。
静的メソッド
runAsync
および
supplyAsync
を使用すると、対応する
Runnable
および
Supplier
機能型から
CompletableFuture
インスタンスを作成できます。
Runnable
と
Supplier
はどちらも、新しいJava 8機能のおかげで、インスタンスをラムダ式として渡すことを可能にする機能インターフェースです。
Runnable
インタフェースは、スレッドで使用されているものと同じ古いインタフェースであり、値を返すことはできません。
Supplier
インタフェースは、引数を持たず、パラメータ化された型の値を返す単一のメソッドを持つ汎用の機能的インタフェースです。
これにより、計算を行い結果を返すラムダ式として
Supplier
のインスタンスを提供することができます。これはとても簡単です。
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
//...
assertEquals("Hello", future.get());
5非同期計算の処理結果
計算結果を処理する最も一般的な方法は、それを関数に渡すことです。
thenApply
メソッドは、
Function
インスタンスを受け取り、それを使用して結果を処理し、関数によって返された値を保持する
Future
を返します。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
Future
チェーンの下の値を返す必要がない場合は、
Consumer
機能インターフェースのインスタンスを使用できます。その単一のメソッドはパラメータを取り、
void
を返します。
CompletableFuture
にこのユースケースのメソッドがあります –
thenAccept
メソッドは
Consumer
を受け取り、それに計算結果を渡します。最後の
future.get()呼び出しは、
Void__型のインスタンスを返します。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最後に、あなたが計算の値を必要とせず、またチェーンの最後で何らかの値を返したくない場合は、
Runnable
λを
thenRun
メソッドに渡すことができます。次の例では、
future.get()
メソッドが呼び出された後、コンソールに単純に1行印刷します。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
6. 先物の組み合わせ
CompletableFuture
APIの最も優れた点は、** CompletableFuture__インスタンスを一連の計算ステップで組み合わせることができることです。
この連鎖の結果は、それ自体がさらなる連鎖と結合を可能にする
CompletableFuture
です。このアプローチは関数型言語ではいたるところにあり、しばしばモナドデザインパターンと呼ばれます。
-
次の例では、
thenCompose
メソッドを使用して2つの
Futures
を順番にチェーンしています。
このメソッドは、
CompletableFuture
インスタンスを返す関数を取ります。この関数の引数は、前の計算ステップの結果です。これにより、次の
CompletableFuture
のlambda内でこの値を使用できます。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose
メソッドと
thenApply
は、モナディックパターンの基本的な構成要素を実装します。これらは、Java 8でも利用可能な
Stream
および
Optional
クラスの
map
および
flatMap
メソッドと密接に関連しています。
どちらのメソッドも関数を受け取り、それを計算結果に適用しますが、
thenCompose
(
flatMap
)メソッドは、同じ型の別のオブジェクトを返す関数を受け取ります。この機能構造により、これらのクラスのインスタンスを構成要素として構成することができます。
2つの独立した
Futures
を実行してその結果で何かをしたい場合は、2つの結果を処理するために2つの引数を持つ
Future
と
Function
を受け入れる
thenCombine
メソッドを使用します。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
もっと単純な場合は、2つの「結果」で何かをしたいが、結果の値を「将来のチェーン」に渡す必要がない場合です。
thenAcceptBoth
メソッドが役に立ちます。
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7.
thenApply()
と
thenCompose()
の違い
前のセクションでは、
thenApply()
と
thenCompose()
に関する例を示しました。どちらのAPIも
CompletableFuture
呼び出しをチェーニングするのに役立ちますが、これら2つの関数の使用法は異なります。
7.1.
thenApply()
-
このメソッドは、前の呼び出しの結果を処理するために使用されます。** ただし、注意すべき重要な点は、戻り型はすべての呼び出しの組み合わせになるということです。
そのため、このメソッドは
__CompletableFuture
__callの結果を変換したい場合に便利です。
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
7.2.
thenCompose()
thenCompose()
メソッドは、どちらも新しいCompletion Stageを返すという点で
thenApply()
と似ています。ただし、
thenCompose()
は前のステージを引数
として使用します。
thenApply()で観察したように、入れ子になった未来ではなく、結果を直接
Future__に返して直接返します
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
そのため、
CompletableFuture
メソッドをチェーン化することを考えている場合は、
thenCompose()
を使用することをお勧めします。
また、これら2つの方法の違いはhttps://www.baeldung.com/java-difference-map-and-flatmap[
map()
と
flatMap()
]の違いに似ています。
8複数の
未来
を並列で実行する
複数の
Futures
を並列に実行する必要があるときは、通常それらすべてが実行されるのを待ってから、それらを組み合わせた結果を処理します。
CompletableFuture.allOf
静的メソッドを使用すると、var-argとして指定されたすべての
Futures
の完了を待つことができます。
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
//...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
CompletableFuture.allOf()
の戻り値の型は
CompletableFuture <Void>
です。このメソッドの制限は、すべての
Futures
の結合結果が返されないことです。代わりに、
Futures
から手動で結果を取得する必要があります。幸いなことに、
CompletableFuture.join()
メソッドとJava 8 Streams APIによって、それが簡単になります。
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
CompletableFuture.join()
メソッドは
get
メソッドと似ていますが、
Future
が正常に完了しない場合に未チェックの例外をスローします。これにより、
Stream.map()
メソッドのメソッド参照として使用できます。
9エラー処理
一連の非同期計算ステップでエラー処理を行うには、
throw/catch
idiomを同様に調整する必要がありました。
構文ブロックで例外をキャッチする代わりに、
CompletableFuture
クラスを使用すると、特別な
handle
メソッドで例外を処理できます。このメソッドは2つのパラメータを受け取ります:計算の結果(正常に終了した場合)とスローされた例外(何らかの計算ステップが正常に終了しなかった場合)です。
次の例では、名前が指定されていないためにグリーティングの非同期計算がエラーで終了したときに
handle
メソッドを使用してデフォルト値を指定します。
String name = null;
//...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
別のシナリオとして、最初の例のように
Future
を手動で値で補完したいが、例外でそれを補完することもできるとします。
completeExceptionally
メソッドはそのためのものです。次の例の
completableFuture.get()
メソッドは、原因として
RuntimeException
を指定して
ExecutionException
をスローします。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
//...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
//...
completableFuture.get();//ExecutionException
上記の例では、
handle
メソッドを使用して例外を非同期的に処理できましたが、
get
メソッドを使用すると、同期的な例外処理のより一般的な方法を使用できます。
10非同期メソッド
CompletableFuture
クラスの流暢なAPIのほとんどのメソッドには、
Async
ポストフィックスを持つ2つの追加の変形があります。これらのメソッドは通常、別のスレッドで対応する実行ステップを実行することを目的としています。
Async
接尾辞のないメソッドは呼び出しスレッドを使って次の実行ステージを実行します。
Executor
引数のない
Async
メソッドは、
ForkJoinPool.commonPool()
メソッドでアクセスされる
Executor
の一般的な
fork/join
pool実装を使用してステップを実行します。
Executor
引数を持つ
Async
メソッドは、渡された
Executor
を使用してステップを実行します。
これは、
Function
インスタンスを使用して計算の結果を処理する修正された例です。唯一目に見える違いは
thenApplyAsync
メソッドです。しかし内部では、関数のアプリケーションは
ForkJoinTask
インスタンスにラップされています(
fork/join
フレームワークの詳細については、記事リンク:/java-fork-joinを参照してください。 )これにより、計算をさらに並列化し、システムリソースをより効率的に使用することができます。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
11. JDK 9
CompletableFuture
API
Java 9では、
CompletableFuture
APIが次のように変更されてさらに拡張されました。
-
新しいファクトリーメソッドが追加されました
-
遅延とタイムアウトのサポート
サブクラス化のサポートが改善されました。
新しいインスタンスAPIが導入されました。
-
Executor defaultExecutor()
-
CompletableFuture <U> newIncompleteFuture()
-
CompletableFuture <T> copy()
-
CompletionStage <T> minimalCompletionStage()
-
__CompletableFuture <T> completeAsync(サプライヤ<?extends T>サプライヤ、
エグゼキュータエグゼキュータ)
**
CompletableFuture <T> completeAsync(サプライヤ<?extends T>サプライヤ)__
-
CompletableFuture <T>またはTimeout(長いタイムアウト、TimeUnit単位)
-
__CompletableFuture <T> completeOnTimeout(T値、長いタイムアウト、
TimeUnitの単位)__
また、いくつかの静的なユーティリティメソッドもあります。
-
__Executor delayedExecutor(長い遅延、TimeUnit単位、Executor
エグゼキュータ)
**
Executor delayedExecutor(長い遅延、TimeUnit単位)__
-
<U> CompletionStage <U> completedStage(U値)
-
__ <U> CompletionStage <U> failedStage(Throwable ex)
-
<U> CompletableFuture <U> failedFuture(Throwable ex)
最後に、タイムアウトに対処するために、Java 9ではさらに2つの新しい機能が導入されました。
-
orTimeout()
-
completeOnTimeout()
詳細な記事はこちらです。
12. 結論
この記事では、
CompletableFuture
クラスのメソッドと典型的なユースケースについて説明しました。
この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。