RxJavaフック

1. 概要

このチュートリアルでは、https://www.baeldung.com/rx-java [RxJava]フックについて学習します。 さまざまな状況でフックがどのように機能するかを示すために、簡単な例を作成します。

2. RxJavaフックとは何ですか?

名前が示すように、RxJavaフックを使用すると、_Observable、_Completable _、_ Maybe _、_ Flowable、__、* __ * Single * .__のライフサイクルにフックできます。さらに、RxJavaを使用すると、__ Schedulersによって返されるスケジューラにライフサイクルフックを追加できます。 __さらに、フックを使用してグローバルエラーハンドラを指定することもできます。
RxJava 1では、クラス_RxJavaHooks_を使用してフックを定義します。 ただし、フック機構はRxJava 2で完全に書き直されています。 *クラス_RxJavaHooks_は、フックを定義するために使用できなくなりました。 代わりに、_RxJavaPlugins_を使用してライフサイクルフックを実装する必要があります*。
_RxJavaPlugins_クラスには、フックを設定するためのいくつかのセッターメソッドがあります。 これらのフックはグローバルです。 それらを設定したら、_RxJavaPlugins_クラスの_reset()_メソッドを呼び出すか、個々のフックのセッターメソッドを呼び出して削除する必要があります。

3. エラー処理のフック

_setErrorHandler()_メソッドを使用して、ダウンストリームのライフサイクルが既にその最終状態に達したために送出できないエラーを処理できます。 エラーハンドラを実装してテストする方法を見てみましょう。
RxJavaPlugins.setErrorHandler(throwable -> {
    hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);
すべての例外がそのままスローされるわけではありません。 ただし、RxJavaは、スローされたエラーが、そのままの状態で通過する必要がある既に指定されたバグケースの1つであるかどうかをチェックします。そうでない場合は、_UndeliverableException_にラップされます。 バグケースとして名前が付けられている例外は次のとおりです。
  • OnErrorNotImplementedException –ユーザーが追加を忘れた場合
    _subscribe()_メソッドの_onError_ハンドラー

  • MissingBackpressureException –_オペレーターのバグまたは
    同時_onNext

  • _IllegalStateException –_一般的なプロトコル違反が発生した場合

  • _NullPointerException –_標準のNULLポインター例外

  • _IllegalArgumentException –_無効なユーザー入力のため

  • _CompositeException –_例外処理中のクラッシュが原因

*4. Completable *のフック

RxJava link:/rxjava-completable[_Completable_]には、2つのライフサイクルフックがあります。 今それらを見てみましょう。

* 4.1。 setOnCompletableAssembly *

RxJavaは、_Completable_で演算子とソースをインスタンス化するときにこのフックを呼び出します。 フック関数の引数として提供される現在の_Completable_オブジェクトを、その操作で使用できます。
RxJavaPlugins.setOnCompletableAssembly(completable -> {
    hookCalled = true;
    return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

* 4.2。 setOnCompletableSubscribe *

RxJavaは、サブスクライバが_Completable_にサブスクライブする前にこのフックを呼び出します。
RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
    hookCalled = true;
    return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

*5. Observable *のフック

次に、_Observable_のRxJavaの3つのライフサイクルフックを見てみましょう。

* 5.1。 setOnObservableAssembly *

RxJavaは、_Observable_で演算子とソースをインスタンス化するときにこのフックを呼び出します。
RxJavaPlugins.setOnObservableAssembly(observable -> {
    hookCalled = true;
    return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

* 5.2。 setOnObservableSubscribe *

RxJavaは、サブスクライバが_Observable_にサブスクライブする前にこのフックを呼び出します。
RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
    hookCalled = true;
    return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

* 5.3。 setOnConnectableObservableAssembly *

このフックは、_ ConnectableObservable _を対象としています。 _ConnectableObservable_は、_Observable_自体のバリアントです。 唯一の違いは、サブスクライブされたときにアイテムの発行を開始するのではなく、その_connect()_メソッドが呼び出されたときのみであるということです。
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
    hookCalled = true;
    return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

*6. Flowable *のフック

次に、https://www.baeldung.com/rxjava-2-flowable [_Flowable_]に定義されているライフサイクルフックを見てみましょう。

* 6.1。 setOnFlowableAssembly *

RxJavaは、_Flowable_で演算子とソースをインスタンス化するときにこのフックを呼び出します。
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
    hookCalled = true;
    return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

* 6.2。 setOnFlowableSubscribe *

RxJavaは、サブスクライバが_Flowable_にサブスクライブする前にこのフックを呼び出します。
RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
    hookCalled = true;
    return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

* 6.3。 setOnConnectableFlowableAssembly *

RxJavaは、_ConnectableFlowable_で演算子とソースをインスタンス化するときにこのフックを呼び出します。 _ConnectableObservable_と同様に、_ConnectableFlowable_も_connect()_メソッドを呼び出したときにのみアイテムの発行を開始します。
RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
    hookCalled = true;
    return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

* 6.4。 setOnParallelAssembly *

_ParallelFlowable_は、複数のパブリッシャー間の並列処理を実現するためのものです。 RxJavaは、_ParallelFlowable_で演算子とソースをインスタンス化するときに、_setOnParallelAssembly()_フックを呼び出します。
RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
    hookCalled = true;
    return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

*7. Maybe *のフック

link:/rxjava-maybe[_Maybe_]エミッターには、ライフサイクルを制御するために定義された2つのフックがあります。

* 7.1。 setOnMaybeAssembly *

RxJavaは、_Maybe_で演算子とソースをインスタンス化するときにこのフックを呼び出します。
RxJavaPlugins.setOnMaybeAssembly(maybe -> {
    hookCalled = true;
    return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

* 7.2。 setOnMaybeSubscribe *

RxJavaは、サブスクライバが_Maybe_にサブスクライブする前にこのフックを呼び出します。
RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
    hookCalled = true;
    return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

*8. Single *のフック

RxJavaは、_Single_エミッターの基本的な2つのフックも定義します。

* 8.1。 setOnSingleAssembly *

RxJavaは、_Single_で演算子とソースをインスタンス化するときにこのフックを呼び出します。
RxJavaPlugins.setOnSingleAssembly(single -> {
    hookCalled = true;
    return single;
});

Single.just(1);

assertTrue(hookCalled);

* 8.2。 setOnSingleSubscribe *

RxJavaは、サブスクライバが_Single_をサブスクライブする前にこのフックを呼び出します。
RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
    hookCalled = true;
    return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

*9. Schedulers *のフック

RxJavaエミッターと同様に、https://www.baeldung.com/rxjava-schedulers [_Schedulers_]には、ライフサイクルを制御するための多数のフックもあります。 RxJavaは、あらゆるタイプの_Schedulers._を使用するときに呼び出される共通のフックを定義します。さらに、さまざまな_Schedulers_に固有のフックを実装することもできます。

* 9.1。 setScheduleHandler *

RxJavaは、操作にスケジューラを使用するときにこのフックを呼び出します。
RxJavaPlugins.setScheduleHandler((runnable) -> {
    hookCalled = true;
    return runnable;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

hookCalled = false;

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled);
_single()_スケジューラと_computation()_スケジューラの両方で操作を繰り返したため、これを実行すると、テストケースはコンソールにメッセージを2回出力します。

* 9.2。 Computation Scheduler *のフック

計算スケジューラには、_setInitComputationSchedulerHandler_と_setComputationSchedulerHandler._という2つのフックがあります。
RxJavaは、計算スケジューラを初期化するときに、_setInitComputationSchedulerHandler_関数を使用して設定したフックを呼び出します。 さらに、_Schedulers.computation()_でタスクをスケジュールするときに、_setComputationSchedulerHandler function_を使用して設定したフックを呼び出します。
RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled && initHookCalled);

* 9.3。 IO Scheduler *のフック

_IO_スケジューラには、_setInitIoSchedulerHandler_と_setIoSchedulerHandler_の2つのフックもあります。
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.io())
  .test();

assertTrue(hookCalled && initHookCalled);

* 9.4。 Single Scheduler *のフック

それでは、_Single_スケジューラーのフックを見てみましょう。
RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

assertTrue(hookCalled && initHookCalled);

* 9.5。 NewThread Scheduler *のフック

他のスケジューラと同様に、_NewThread_スケジューラも2つのフックを定義します。
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 15)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.newThread())
  .test();

assertTrue(hookCalled && initHookCalled);

10. 結論

このチュートリアルでは、さまざまなRxJavaライフサイクルフックとは何か、それらを実装する方法を学びました。 これらのフックの中で、エラー処理フックは最も注目に値するものです。 ただし、サブスクライバーの数やその他の特定のユースケースを記録するなど、監査目的で他のものを使用することはできます。
そして、いつものように、ここで説明した短い例はすべてhttps://github.com/eugenp/tutorials/tree/master/rxjava-2[Githubで]にあります。