1前書き

この記事は、Java 8 Concurrency APIの改良として導入された

CompletableFuture

クラスの機能と使用例についてのガイドです。


2 Java

での非同期計算

非同期計算は、考えるのが難しいです。通常、私たちはあらゆる計算を一連のステップとして考えたいと思います。しかし、非同期計算の場合、コールバックとして表されるアクションは、コード全体に分散しているか、または互いの内側に深くネストされている傾向があります。

いずれかの手順で発生する可能性があるエラーを処理する必要がある場合、状況はさらに悪化します。

非同期計算の結果として機能するためにJava 5では

Future

インタフェースが追加されましたが、これらの計算を組み合わせたり、起こりうるエラーを処理するためのメソッドはありませんでした。

  • Java 8では、

    CompletableFuture

    クラスが導入されました**

    Future

    インタフェースとともに、それはまた

    CompletionStage

    インタフェースを実装しました。このインタフェースは、他のステップと組み合わせることができる非同期計算ステップの規約を定義します。


CompletableFuture

は同時に、非同期計算ステップの作成、結合、実行、およびエラー処理のための、約50種類の異なるメソッドを持つビルディングブロックおよびフレームワークです。

そのような大規模なAPIは圧倒的になる可能性がありますが、これらは主にいくつかの明確で異なる使用例に分類されます。


3

CompletableFuture

を単純な

Future


として使用する

まず第一に、

CompletableFuture

クラスは

Future

インターフェースを実装しているので、

Future

実装として

使用できますが、追加の補完ロジック

があります。

たとえば、将来の結果を表すために引数なしのコンストラクタを使用してこのクラスのインスタンスを作成し、それをコンシューマに配布し、

complete

メソッドを使用して将来的に完成させることができます。この結果が提供されるまで、コンシューマは

get

メソッドを使用して現在のスレッドをブロックできます。

以下の例では、

CompletableFuture

インスタンスを作成してから別のスレッドで計算をスピンオフしてすぐに

Future

を返すメソッドがあります。

計算が完了すると、メソッドは結果を

complete

メソッドに提供することで

Future

を完成させます。

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture
      = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

計算をスピンオフするには、記事リンクに記載されている

Executor

APIを使用します。/thread-pool-java-and-guava生スレッドを含む任意の並行性メカニズムまたはAPIと一緒に使用します。


calculateAsync

メソッドが

Future

インスタンスを返すことに注意してください。

結果をブロックする準備ができたら、単純にメソッドを呼び出し、

Future

インスタンスを受け取り、その上で

get

メソッドを呼び出します。


get

メソッドはいくつかのチェックされた例外、すなわち

ExecutionException

(計算中に発生した例外のカプセル化)と

InterruptedException

(メソッドを実行しているスレッドが中断されたことを示す例外)を投げます。

Future<String> completableFuture = calculateAsync();
//...

String result = completableFuture.get();
assertEquals("Hello", result);

  • すでに計算結果を知っている場合は、この計算結果を表す引数を付けてstatic

    completedFuture

    メソッドを使用できます。それから

    Future



    get

    メソッドは決してブロックされず、代わりにこの結果を即座に返します。

Future<String> completableFuture =
  CompletableFuture.completedFuture("Hello");
//...

String result = completableFuture.get();
assertEquals("Hello", result);

別のシナリオとして、

Future

** の実行をキャンセルしたいかもしれません。

結果を見つけることができず、非同期実行を完全にキャンセルすることにしたとします。これは

Future

s

cancel

メソッドで行うことができます。このメソッドは

boolean

引数

mayInterruptIfRunning

を受け取りますが、

CompletableFuture

の場合、

CompletableFuture

の処理を制御するための割り込みは使用されないため、効果はありません。

これが非同期メソッドの修正版です。

public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });

    return completableFuture;
}


Future.get()

メソッドを使用して結果をブロックするときに、将来がキャンセルされた場合は

CancellationException

がスローされます。

Future<String> future = calculateAsyncWithCancellation();
future.get();//CancellationException


4カプセル化計算ロジックを使った

CompletableFuture


上記のコードを使用すると、同時実行の任意のメカニズムを選択できますが、この定型句をスキップして単純に非同期でコードを実行する場合はどうなりますか。

静的メソッド

runAsync

および

supplyAsync

を使用すると、対応する

Runnable

および

Supplier

機能型から

CompletableFuture

インスタンスを作成できます。


Runnable



Supplier

はどちらも、新しいJava 8機能のおかげで、インスタンスをラムダ式として渡すことを可能にする機能インターフェースです。


Runnable

インタフェースは、スレッドで使用されているものと同じ古いインタフェースであり、値を返すことはできません。


Supplier

インタフェースは、引数を持たず、パラメータ化された型の値を返す単一のメソッドを持つ汎用の機能的インタフェースです。

これにより、計算を行い結果を返すラムダ式として

Supplier

のインスタンスを提供することができます。これはとても簡単です。

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");
//...

assertEquals("Hello", future.get());


5非同期計算の処理結果

計算結果を処理する最も一般的な方法は、それを関数に渡すことです。

thenApply

メソッドは、

Function

インスタンスを受け取り、それを使用して結果を処理し、関数によって返された値を保持する

Future

を返します。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());


Future

チェーンの下の値を返す必要がない場合は、

Consumer

機能インターフェースのインスタンスを使用できます。その単一のメソッドはパラメータを取り、

void

を返します。


CompletableFuture

にこのユースケースのメソッドがあります –

thenAccept

メソッドは

Consumer

を受け取り、それに計算結果を渡します。最後の

future.get()呼び出しは、

Void__型のインスタンスを返します。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

最後に、あなたが計算の値を必要とせず、またチェーンの最後で何らかの値を返したくない場合は、

Runnable

λを

thenRun

メソッドに渡すことができます。次の例では、

future.get()

メソッドが呼び出された後、コンソールに単純に1行印刷します。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();


6. 先物の組み合わせ


CompletableFuture

APIの最も優れた点は、** CompletableFuture__インスタンスを一連の計算ステップで組み合わせることができることです。

この連鎖の結果は、それ自体がさらなる連鎖と結合を可能にする

CompletableFuture

です。このアプローチは関数型言語ではいたるところにあり、しばしばモナドデザインパターンと呼ばれます。

  • 次の例では、

    thenCompose

    メソッドを使用して2つの

    Futures

    を順番にチェーンしています。

このメソッドは、

CompletableFuture

インスタンスを返す関数を取ります。この関数の引数は、前の計算ステップの結果です。これにより、次の

CompletableFuture

のlambda内でこの値を使用できます。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());


thenCompose

メソッドと

thenApply

は、モナディックパターンの基本的な構成要素を実装します。これらは、Java 8でも利用可能な

Stream

および

Optional

クラスの

map

および

flatMap

メソッドと密接に関連しています。

どちらのメソッドも関数を受け取り、それを計算結果に適用しますが、

thenCompose



flatMap

)メソッドは、同じ型の別のオブジェクトを返す関数を受け取ります。この機能構造により、これらのクラスのインスタンスを構成要素として構成することができます。

2つの独立した

Futures

を実行してその結果で何かをしたい場合は、2つの結果を処理するために2つの引数を持つ

Future



Function

を受け入れる

thenCombine

メソッドを使用します。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

もっと単純な場合は、2つの「結果」で何かをしたいが、結果の値を「将来のチェーン」に渡す必要がない場合です。

thenAcceptBoth

メソッドが役に立ちます。

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));


7.

thenApply()



thenCompose()


の違い

前のセクションでは、

thenApply()



thenCompose()

に関する例を示しました。どちらのAPIも

CompletableFuture

呼び出しをチェーニングするのに役立ちますが、これら2つの関数の使用法は異なります。


7.1.

thenApply()


  • このメソッドは、前の呼び出しの結果を処理するために使用されます。** ただし、注意すべき重要な点は、戻り型はすべての呼び出しの組み合わせになるということです。

そのため、このメソッドは

__CompletableFuture

__callの結果を変換したい場合に便利です。

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);


7.2.

thenCompose()



thenCompose()

メソッドは、どちらも新しいCompletion Stageを返すという点で

thenApply()

と似ています。ただし、


thenCompose()

は前のステージを引数

として使用します。

thenApply()で観察したように、入れ子になった未来ではなく、結果を直接

Future__に返して直接返します

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

そのため、

CompletableFuture

メソッドをチェーン化することを考えている場合は、

thenCompose()

を使用することをお勧めします。

また、これら2つの方法の違いはhttps://www.baeldung.com/java-difference-map-and-flatmap[

map()



flatMap()

]の違いに似ています。


8複数の

未来

を並列で実行する

複数の

Futures

を並列に実行する必要があるときは、通常それらすべてが実行されるのを待ってから、それらを組み合わせた結果を処理します。


CompletableFuture.allOf

静的メソッドを使用すると、var-argとして指定されたすべての

Futures

の完了を待つことができます。

CompletableFuture<String> future1
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture
  = CompletableFuture.allOf(future1, future2, future3);
//...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());


CompletableFuture.allOf()

の戻り値の型は

CompletableFuture <Void>

です。このメソッドの制限は、すべての

Futures

の結合結果が返されないことです。代わりに、

Futures

から手動で結果を取得する必要があります。幸いなことに、

CompletableFuture.join()

メソッドとJava 8 Streams APIによって、それが簡単になります。

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);


CompletableFuture.join()

メソッドは

get

メソッドと似ていますが、

Future

が正常に完了しない場合に未チェックの例外をスローします。これにより、

Stream.map()

メソッドのメソッド参照として使用できます。


9エラー処理

一連の非同期計算ステップでエラー処理を行うには、

throw/catch

idiomを同様に調整する必要がありました。

構文ブロックで例外をキャッチする代わりに、

CompletableFuture

クラスを使用すると、特別な

handle

メソッドで例外を処理できます。このメソッドは2つのパラメータを受け取ります:計算の結果(正常に終了した場合)とスローされた例外(何らかの計算ステップが正常に終了しなかった場合)です。

次の例では、名前が指定されていないためにグリーティングの非同期計算がエラーで終了したときに

handle

メソッドを使用してデフォルト値を指定します。

String name = null;
//...

CompletableFuture<String> completableFuture
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

別のシナリオとして、最初の例のように

Future

を手動で値で補完したいが、例外でそれを補完することもできるとします。

completeExceptionally

メソッドはそのためのものです。次の例の

completableFuture.get()

メソッドは、原因として

RuntimeException

を指定して

ExecutionException

をスローします。

CompletableFuture<String> completableFuture = new CompletableFuture<>();
//...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));
//...

completableFuture.get();//ExecutionException

上記の例では、

handle

メソッドを使用して例外を非同期的に処理できましたが、

get

メソッドを使用すると、同期的な例外処理のより一般的な方法を使用できます。


10非同期メソッド


CompletableFuture

クラスの流暢なAPIのほとんどのメソッドには、

Async

ポストフィックスを持つ2つの追加の変形があります。これらのメソッドは通常、別のスレッドで対応する実行ステップを実行することを目的としています。


Async

接尾辞のないメソッドは呼び出しスレッドを使って次の実行ステージを実行します。

Executor

引数のない

Async

メソッドは、

ForkJoinPool.commonPool()

メソッドでアクセスされる

Executor

の一般的な

fork/join

pool実装を使用してステップを実行します。


Executor

引数を持つ

Async

メソッドは、渡された

Executor

を使用してステップを実行します。

これは、

Function

インスタンスを使用して計算の結果を処理する修正された例です。唯一目に見える違いは

thenApplyAsync

メソッドです。しかし内部では、関数のアプリケーションは

ForkJoinTask

インスタンスにラップされています(

fork/join

フレームワークの詳細については、記事リンク:/java-fork-joinを参照してください。 )これにより、計算をさらに並列化し、システムリソースをより効率的に使用することができます。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9

CompletableFuture

AP​​I

Java 9では、

CompletableFuture

APIが次のように変更されてさらに拡張されました。

  • 新しいファクトリーメソッドが追加されました

  • 遅延とタイムアウトのサポート

サブクラス化のサポートが改善されました。

新しいインスタンスAPIが導入されました。


  • Executor defaultExecutor()


  • CompletableFuture <U> newIncompleteFuture()


  • CompletableFuture <T> copy()


  • CompletionStage <T> minimalCompletionStage()

  • __CompletableFuture <T> completeAsync(サプライヤ<?extends T>サプライヤ、

エグゼキュータエグゼキュータ)

**

CompletableFuture <T> completeAsync(サプライヤ<?extends T>サプライヤ)__


  • CompletableFuture <T>またはTimeout(長いタイムアウト、TimeUnit単位)

  • __CompletableFuture <T> completeOnTimeout(T値、長いタイムアウト、

TimeUnitの単位)__

また、いくつかの静的なユーティリティメソッドもあります。

  • __Executor delayedExecutor(長い遅延、TimeUnit単位、Executor

エグゼキュータ)

**

Executor delayedExecutor(長い遅延、TimeUnit単位)__


  • <U> CompletionStage <U> completedStage(U値)

  • __ <U> CompletionStage <U> failedStage(Throwable ex)


  • <U> CompletableFuture <U> failedFuture(Throwable ex)

最後に、タイムアウトに対処するために、Java 9ではさらに2つの新しい機能が導入されました。


  • orTimeout()


  • completeOnTimeout()

詳細な記事はこちらです。


12. 結論

この記事では、

CompletableFuture

クラスのメソッドと典型的なユースケースについて説明しました。

この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency[over on GitHub]から入手できます。