1. 序章

この記事では、RxJavaを使用して例外とエラーを処理する方法を見ていきます。

まず、Observableは通常例外をスローしないことに注意してください。 代わりに、デフォルトでは、 ObservableObserverのonError()メソッドを呼び出し、回復不能なエラーが発生したことをオブザーバーに通知し、Observerのメソッド。

これから紹介するエラー処理演算子は、 Observable シーケンスを再開または再試行することにより、デフォルトの動作を変更します。

2. Mavenの依存関係

まず、pom.xmlにRxJavaを追加しましょう。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.3</version>
</dependency>

アーティファクトの最新バージョンはここにあります。

3. エラー処理

エラーが発生した場合、通常は何らかの方法で処理する必要があります。 たとえば、関連する外部状態を変更して、デフォルトの結果でシーケンスを再開するか、エラーが伝播するようにそのままにしておきます。

3.1. エラー時のアクション

doOnError を使用すると、エラーが発生したときに必要なアクションを呼び出すことができます。

@Test
public void whenChangeStateOnError_thenErrorThrown() {
    TestObserver testObserver = new TestObserver();
    AtomicBoolean state = new AtomicBoolean(false);
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> state.set(true))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
 
    assertTrue("state should be changed", state.get());
}

アクションの実行中に例外がスローされた場合、RxJavaは例外をCompositeExceptionでラップします。

@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> {
          throw new RuntimeException("unexcepted");
      })
      .subscribe(testObserver);

    testObserver.assertError(CompositeException.class);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

3.2. デフォルトのアイテムで再開

doOnError を使用してアクションを呼び出すことはできますが、エラーによって標準のシーケンスフローが中断されます。 デフォルトのオプションを使用してシーケンスを再開したい場合があります。これは、onErrorReturnItemが行うことです。

@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturnItem("singleValue")
      .subscribe(testObserver);
 
    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("singleValue");
}

動的なデフォルトアイテムサプライヤが望ましい場合は、onErrorReturnを使用できます。

@Test
public void whenHandleOnErrorReturn_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturn(Throwable::getMessage)
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("unknown error");
}

3.3. 別のシーケンスで再開

エラーが発生した場合、単一のアイテムにフォールバックする代わりに、onErrorResumeNextを使用してフォールバックデータシーケンスを提供する場合があります。 これは、エラーの伝播を防ぐのに役立ちます。

@Test
public void whenHandleOnErrorResume_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(Observable.just("one", "two"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("one", "two");
}

フォールバックシーケンスが特定の例外タイプによって異なる場合、またはシーケンスを関数で生成する必要がある場合は、関数を onErrorResumeNext:に渡すことができます。

@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(throwable -> Observable
        .just(throwable.getMessage(), "nextValue"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("unknown error", "nextValue");
}

3.4. 例外の処理 それだけ

RxJavaは、例外(ただしエラーなし)が発生したときに、提供されたObservableでシーケンスを続行できるようにするフォールバックメソッドも提供します。

@Test
public void whenHandleOnException_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_EXCEPTION)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("exceptionResumed");
}

@Test
public void whenHandleOnException_thenNotResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
}

上記のコードが示すように、エラーが発生した場合、onExceptionResumeNextはシーケンスを再開するために起動しません。

4. エラーで再試行

通常のシーケンスは、一時的なシステム障害またはバックエンドエラーによって中断される可能性があります。 このような状況では、再試行して、シーケンスが修正されるまで待ちます。

幸い、RxJavaには、まさにそれを実行するためのオプションがあります。

4.1. リトライ

retry を使用すると、エラーが発生しなくなるまで、Observableが無限に再サブスクライブされます。 ただし、ほとんどの場合、一定量の再試行をお勧めします。

@Test
public void whenRetryOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry(1)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should try twice", atomicCounter.get() == 2);
}

4.2. 条件付きで再試行

条件付き再試行は、述語retryを使用するかretryUntil を使用して、RxJavaでも実行可能です。

@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry((integer, throwable) -> integer < 4)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(UNKNOWN_ERROR)
      .retryUntil(() -> atomicCounter.incrementAndGet() > 3)
      .subscribe(testObserver);
    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

4.3. RetryWhen

これらの基本的なオプションに加えて、興味深い再試行方法もあります:restartWhen

これにより、 Observable、 say “ NewO”、が返され、ソース ObservableSource 、たとえば“ OldO”、と同じ値が返されます。返されたObservable「NewO」がonCompleteまたはonErrorを呼び出すと、サブスクライバーのonCompleteまたはonErrorが呼び出されます。

また、“ NewO” がアイテムを放出すると、source ObservableSource “ OldO”への再サブスクリプションがトリガーされます。

以下のテストは、これがどのように機能するかを示しています。

@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    Exception noretryException = new Exception("don't retry");
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> Observable.error(noretryException))
      .subscribe(testObserver);

    testObserver.assertError(noretryException);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

@Test
public void whenRetryWhenOnError_thenCompleted() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.empty())
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should not retry", atomicCounter.get()==0);
}

@Test
public void whenRetryWhenOnError_thenResubscribed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.just("anything"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should retry once", atomicCounter.get()==1);
}

retryWhen の一般的な使用法は、遅延が可変の制限付き再試行です。

@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
    TestObserver testObserver = new TestObserver();
    long before = System.currentTimeMillis();
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> throwableObservable
        .zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
        .flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
      .blockingSubscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    long secondsElapsed = (System.currentTimeMillis() - before)/1000;
    assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}

このロジックが3回再試行し、各再試行を段階的に遅らせる方法に注目してください。

5. 概要

この記事では、RxJavaでエラーと例外を処理するいくつかの方法を紹介しました。

エラー処理に関連するRxJava固有の例外もいくつかあります。詳細については、公式ウィキを参照してください。

いつものように、完全な実装はGithubにあります。