1. 序章

グアバは私たちに ListenableFuture デフォルトのJavaよりも強化されたAPIを使用未来。 これをどのように活用できるか見てみましょう。

2. Future ListenableFuture 、および Futures

これらの異なるクラスとは何か、およびそれらが互いにどのように関連しているかを簡単に見てみましょう。

2.1. 将来

Java 5以降、 java.util.concurrent.Futureを使用して非同期タスクを表すことができます。

Future を使用すると、タスクのキャンセルのサポートとともに、すでに完了した、または将来完了する可能性のあるタスクの結果にアクセスできます。

2.2. ListenableFuture

java.util.concurrent.Future を使用するときに欠けている機能の1つは、完了時に実行するリスナーを追加する機能です。これは、最も一般的な非同期フレームワークによって提供される一般的な機能です。

Guavaは、リスナーcom.google.common.util.concurrent.ListenableFuture。に接続できるようにすることでこの問題を解決します。

2.3. 先物

グアバは私たちに便利なクラスを提供します com.google.common.util.concurrent.Futures 彼らとの作業を容易にするために ListenableFuture。

このクラスは、 ListenableFuture、と対話するさまざまな方法を提供します。その中には、成功/失敗のコールバックを追加し、集計または変換を使用して複数の未来を調整できるようにするサポートがあります。

3. 簡単な使用法

ここで、ListenableFutureを最も簡単な方法で使用する方法を見てみましょう。 コールバックの作成と追加。

3.1. ListenableFutureの作成

ListenableFutureを取得する最も簡単な方法は、タスクをListeningExecutorService に送信することです(通常のExecutorServiceを使用して通常のFutureを取得する方法とよく似ています)。 :

ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});

使用方法に注意してください MoreExecutors 私たちを飾るためのクラス ExecutorService として ListeningExecutorService。 参照できます Guavaでのスレッドプールの実装詳細については MoreExecutors

Future を返すAPIがすでにあり、それを ListenableFuture に変換する必要がある場合、具体的な実装 ListenableFutureTaskを初期化することで、これを簡単に行うことができます

// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

これらのタスクは、Executorに送信しない限り実行されないことに注意する必要があります。 ListenableFutureTaskと直接対話することは一般的な使用法ではなく、まれなシナリオでのみ実行されます(例:独自のExecutorService)を実装します。 実際の使用法については、GuavaのAbstractListeningExecutorServiceを参照してください。

非同期タスクでListeningExecutorServiceまたは提供されているFuturesユーティリティメソッドを使用できない場合は、com.google.common.util.concurrent.SettableFutureを使用することもできます。将来の値を手動で設定する必要があります。 より複雑な使用法については、com.google.common.util.concurrent.AbstractFuture。を検討することもできます。

3.2. リスナー/コールバックの追加

リスナーをListenableFutureに追加する1つの方法は、Futures.addCallback()にコールバックを登録し、成功または失敗が発生したときに結果または例外にアクセスできるようにすることです。

Executor listeningExecutor = Executors.newSingleThreadExecutor();

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);

リスナーをListenableFutureに直接追加することで、リスナーを追加することもできます。このリスナーは、futureが正常に完了するか例外的に完了すると実行されることに注意してください。 また、非同期タスクの結果にアクセスできないことに注意してください。

Executor listeningExecutor = Executors.newSingleThreadExecutor();

int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);

4. 複雑な使用法

ここで、より複雑なシナリオでこれらの先物をどのように使用できるかを見てみましょう。

4.1. ファンイン

複数の非同期タスクを呼び出してその結果を収集する必要がある場合があります。これは通常、ファンイン操作と呼ばれます。

Guavaは、これを行う2つの方法を提供します。 ただし、要件に応じて正しい方法を選択する場合は注意が必要です。 次の非同期タスクを調整する必要があると仮定しましょう。

ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");

複数の先物をファンインする1つの方法は、Futures.allAsList()メソッドを使用することです。 これにより、すべての未来が成功した場合に、すべての未来の結果を収集できます。 提供された先物の順序で。 これらの先物のいずれかが失敗した場合、結果全体が失敗した未来になります。

ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }

    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);

失敗したかどうかに関係なく、すべての非同期タスクの結果を収集する必要がある場合は、Futures.successfulAsList()を使用できます。 これにより、結果が引数に渡されたタスクと同じ順序になるリストが返され、失敗したタスクには、リスト内のそれぞれの位置にnullが割り当てられます。

ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }

    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);

上記の使用法では、将来のタスクが通常成功時にnullを返す場合、失敗したタスクと区別がつかないことに注意する必要があります(これにより結果もnullに設定されます)。

4.2. コンバイナーによるファンイン

異なる結果を返す複数の先物を調整する必要がある場合、上記の解決策では不十分な場合があります。 この場合、ファンイン操作のコンバイナーバリアントを使用して、この未来の組み合わせを調整できます。

単純なファンイン操作と同様に、Guavaには2つのバリエーションがあります。 1つはすべてのタスクが正常に完了したときに成功し、もう1つはFutures.whenAllSucceed()メソッドとFutures.whenAllComplete()メソッドを使用して一部のタスクが失敗した場合でも成功します。

Futures.whenAllSucceed()を使用して、複数の先物からのさまざまな結果タイプを組み合わせる方法を見てみましょう。

ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }

    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);

一部のタスクの失敗を許可する必要がある場合は、 Futures.whenAllComplete()を使用できます。 セマンティクスは上記とほとんど同じですが、 Futures.getDone()が呼び出されると、失敗したフューチャーはExecutionExceptionをスローすることに注意してください。

4.3. 変換

成功したら、将来の結果を変換する必要がある場合があります。 Guavaは、 Futures.transform() Futures.lazyTransform()を使用してこれを行う2つの方法を提供します。

どうすればいいのか見てみましょう Futures.transform()を使用して、futureの結果を変換します。 これは、変換の計算が重くない限り使用できます。

ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);

Futures.lazyTransform()を使用して、変換関数をjava.util.concurrent.Future。に適用することもできます。このオプションは適用されないことに注意する必要があります。 ListenableFuture を返しますが、通常の java .util.concurrent.Future であり、結果のフューチャーで get()が呼び出されるたびに変換関数が適用されます。 。

4.4. 未来をつなぐ

私たちの未来が他の未来を呼ぶ必要がある状況に出くわすかもしれません。 このような場合、Guavaは async()バリアントを提供して、これらの未来を安全に連鎖させて次々に実行します。

Futures.submitAsync()を使用して、送信されたCallable内からfutureを呼び出す方法を見てみましょう:

AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);

ある未来の結果が別の未来の計算に供給される真の連鎖が必要な場合は、 Futures.transformAsync()を使用できます。

ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);

Guavaは、 Futures.scheduleAsync()および Futures.catchingAsync()も提供して、スケジュールされたタスクを送信し、エラー回復のフォールバックタスクを提供します。 これらはさまざまなシナリオに対応していますが、他の async()呼び出しと類似しているため、ここでは説明しません。

5. 使用上の注意事項と禁止事項

ここで、先物を扱うときに遭遇する可能性のあるいくつかの一般的な落とし穴と、それらを回避する方法を調べてみましょう。

5.1. 働くvs。 リスニングエグゼキュータ

Guavaフューチャーを使用する場合は、ワーキングエグゼキューターとリスニングエグゼキューターの違いを理解することが重要です。 たとえば、構成をフェッチする非同期タスクがあるとします。

public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

また、上記の未来にリスナーを付けたいとしましょう。

ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);

lExecService は非同期タスクを実行しているエグゼキューターであり、listeningExecutorはリスナーが呼び出されるエグゼキューターであることに注意してください。

上記のように、リスナーとワーカーが同じスレッドプールリソースをめぐって競合するシナリオを回避するために、これら2つのエグゼキューターを分離することを常に検討する必要があります。実行。 または、ひどく書かれたヘビーウェイトリスナーは、私たちの重要なヘビーデューティータスクをブロックしてしまいます。

5.2. directExecutor()に注意してください

MoreExecutors.directExecutor()および MoreExecutors.newDirectExecutorService()を単体テストで使用して、非同期実行の処理を容易にすることはできますが、本番コードでの使用には注意が必要です。

上記のメソッドからエグゼキュータを取得すると、ヘビーウェイトであれリスナーであれ、エグゼキュータに送信するすべてのタスクが現在のスレッドで実行されます。 現在の実行コンテキストが高スループットを必要とするコンテキストである場合、これは危険な場合があります。

たとえば、 directExecutor を使用し、UIスレッドでヘビーウェイトタスクを送信すると、UIスレッドが自動的にブロックされます。

また、リスナー が他のすべてのリスナー( directExecutor に関与していないリスナーも含む)の速度を低下させるというシナリオに直面する可能性もあります。 これは、GuavaがそれぞれのExecutorsのwhileループですべてのリスナーを実行しますが、directExecutorによってリスナーがと同じスレッドで実行されるためです。 X199X]whileループ。

5.3. 先物の入れ子は悪い

連鎖した未来を扱うときは、ネストされた未来を作成するような方法で、ある未来を別の未来の内側から呼び出さないように注意する必要があります。

public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service.com");
    return generatePassword(username);
});

ListenableFutureを持つコードを見た場合 >、それならこれはひどく書かれた未来であることを知っておくべきですなぜなら、外側の未来のキャンセルと完了が競合し、キャンセルが内側の未来に伝播しない可能性があるからです。

上記のシナリオを見る場合は、常に Futures.async()バリアントを使用して、これらの連鎖した未来を接続された方法で安全にアンラップする必要があります。

5.4. JdkFutureAdapters.listenInPoolThread()に注意してください

グアバは、私たちがそれを活用できる最善の方法をお勧めします ListenableFuture を使用するすべてのコードを変換することによってです未来 ListenableFuture。 

一部のシナリオでこの変換が実行できない場合は、 Guavaは、JdkFutureAdapters.listenInPoolThread()オーバーライドを使用してこれを行うためのアダプターを提供します。 これは役立つように思えるかもしれませんが、 Guavaは、これらは重量級のアダプターであり、可能な限り避ける必要があると警告しています。

6. 結論

この記事では、Guavaの ListenableFuture を使用して先物の使用法を充実させる方法と、 FuturesAPIを使用してこれらの先物を簡単に操作できるようにする方法を説明しました。

また、これらのフューチャーおよび提供されたエグゼキュータを操作するときに発生する可能性のある一般的なエラーもいくつか見られます。

いつものように、私たちの例を含む完全なソースコードは、GitHubから入手できます。