RxJavaとエラー処理
1. 序章
この記事では、RxJavaを使用して例外とエラーを処理する方法を見ていきます。
まず、Observableは通常例外をスローしないことに注意してください。 代わりに、デフォルトでは、 ObservableはObserverのonError()メソッドを呼び出し、回復不能なエラーが発生したことをオブザーバーに通知し、Observerのメソッド。
これから紹介するエラー処理演算子は、 Observable シーケンスを再開または再試行することにより、デフォルトの動作を変更します。
2. Mavenの依存関係
まず、pom.xmlにRxJavaを追加しましょう。
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.3</version>
</dependency>
アーティファクトの最新バージョンはここにあります。
3. エラー処理
エラーが発生した場合、通常は何らかの方法で処理する必要があります。 たとえば、関連する外部状態を変更して、デフォルトの結果でシーケンスを再開するか、エラーが伝播するようにそのままにしておきます。
3.1. エラー時のアクション
doOnError を使用すると、エラーが発生したときに必要なアクションを呼び出すことができます。
@Test
public void whenChangeStateOnError_thenErrorThrown() {
TestObserver testObserver = new TestObserver();
AtomicBoolean state = new AtomicBoolean(false);
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("state should be changed", state.get());
}
アクションの実行中に例外がスローされた場合、RxJavaは例外をCompositeExceptionでラップします。
@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
3.2. デフォルトのアイテムで再開
doOnError を使用してアクションを呼び出すことはできますが、エラーによって標準のシーケンスフローが中断されます。 デフォルトのオプションを使用してシーケンスを再開したい場合があります。これは、onErrorReturnItemが行うことです。
@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}
動的なデフォルトアイテムサプライヤが望ましい場合は、onErrorReturnを使用できます。
@Test
public void whenHandleOnErrorReturn_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}
3.3. 別のシーケンスで再開
エラーが発生した場合、単一のアイテムにフォールバックする代わりに、onErrorResumeNextを使用してフォールバックデータシーケンスを提供する場合があります。 これは、エラーの伝播を防ぐのに役立ちます。
@Test
public void whenHandleOnErrorResume_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}
フォールバックシーケンスが特定の例外タイプによって異なる場合、またはシーケンスを関数で生成する必要がある場合は、関数を onErrorResumeNext:に渡すことができます。
@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> Observable
.just(throwable.getMessage(), "nextValue"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
3.4. 例外の処理 それだけ
RxJavaは、例外(ただしエラーなし)が発生したときに、提供されたObservableでシーケンスを続行できるようにするフォールバックメソッドも提供します。
@Test
public void whenHandleOnException_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}
@Test
public void whenHandleOnException_thenNotResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}
上記のコードが示すように、エラーが発生した場合、onExceptionResumeNextはシーケンスを再開するために起動しません。
4. エラーで再試行
通常のシーケンスは、一時的なシステム障害またはバックエンドエラーによって中断される可能性があります。 このような状況では、再試行して、シーケンスが修正されるまで待ちます。
幸い、RxJavaには、まさにそれを実行するためのオプションがあります。
4.1. リトライ
retry 、を使用すると、エラーが発生しなくなるまで、Observableが無限に再サブスクライブされます。 ただし、ほとんどの場合、一定量の再試行をお勧めします。
@Test
public void whenRetryOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should try twice", atomicCounter.get() == 2);
}
4.2. 条件付きで再試行
条件付き再試行は、述語でretryを使用するかretryUntil を使用して、RxJavaでも実行可能です。
@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
4.3. RetryWhen
これらの基本的なオプションに加えて、興味深い再試行方法もあります:restartWhen。
これにより、 Observable、 say “ NewO”、が返され、ソース ObservableSource 、たとえば“ OldO”、と同じ値が返されます。返されたObservable「NewO」がonCompleteまたはonErrorを呼び出すと、サブスクライバーのonCompleteまたはonErrorが呼び出されます。
また、“ NewO” がアイテムを放出すると、source ObservableSource “ OldO”への再サブスクリプションがトリガーされます。
以下のテストは、これがどのように機能するかを示しています。
@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
Exception noretryException = new Exception("don't retry");
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.error(noretryException))
.subscribe(testObserver);
testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void whenRetryWhenOnError_thenCompleted() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get()==0);
}
@Test
public void whenRetryWhenOnError_thenResubscribed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get()==1);
}
retryWhen の一般的な使用法は、遅延が可変の制限付き再試行です。
@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver testObserver = new TestObserver();
long before = System.currentTimeMillis();
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
.blockingSubscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before)/1000;
assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}
このロジックが3回再試行し、各再試行を段階的に遅らせる方法に注目してください。
5. 概要
この記事では、RxJavaでエラーと例外を処理するいくつかの方法を紹介しました。
エラー処理に関連するRxJava固有の例外もいくつかあります。詳細については、公式ウィキを参照してください。
いつものように、完全な実装はGithubのにあります。