1. 概要

この記事では、RxJavaで Observables を操作するためのいくつかのユーティリティ演算子と、カスタムのものを実装する方法について説明します。

演算子は、アップストリームのObservableの動作を取得および変更する関数です。 ダウンストリームのObservableを返しますまたはサブスクライバー 、タイプTとRは同じである場合と同じでない場合があります。

オペレーターは、既存の Observables をラップし、通常、サブスクリプションをインターセプトすることによってそれらを拡張します。 これは複雑に聞こえるかもしれませんが、実際には非常に柔軟で、把握するのはそれほど難しくありません。

2. 実行

監視可能なライフサイクルイベントを変更する可能性のある複数のアクションがあります。

d oOnNext オペレーターは、 Observable source を変更して、onNextが呼び出されたときにアクションを呼び出すようにします。

doOnCompletedオペレーターは、結果のObservableが正常に終了した場合に呼び出されるアクションを登録しObserveronCompletedメソッドを呼び出します。

Observable.range(1, 10)
  .doOnNext(r -> receivedTotal += r)
  .doOnCompleted(() -> result = "Completed")
  .subscribe();
 
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));

doOnEach オペレーターは、 Observable ソースを変更して、各アイテムの Observer に通知し、アイテムが発行されるたびに呼び出されるコールバックを確立します。

doOnSubscribe オペレーターは、Observerが結果のObservableをサブスクライブするたびに呼び出されるアクションを登録します。

doOnSubscribe:の反対を行うdoOnUnsubscribe演算子もあります。

Observable.range(1, 10)
  .doOnEach(new Observer<Integer>() {
      @Override
      public void onCompleted() {
          System.out.println("Complete");
      }
      @Override
      public void onError(Throwable e) {
          e.printStackTrace();
      }
      @Override
      public void onNext(Integer value) {
          receivedTotal += value;
      }
  })
  .doOnSubscribe(() -> result = "Subscribed")
  .subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));

Observable がエラーで完了した場合、doOnError演算子を使用してアクションを実行できます。

DoOnTerminateオペレーターは、Observableが正常にまたはエラーで完了したときに呼び出されるアクションを登録します。

thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
  .single()
  .doOnError(throwable -> { throw new RuntimeException("error");})
  .doOnTerminate(() -> result += "doOnTerminate")
  .doAfterTerminate(() -> result += "_doAfterTerminate")
  .subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));

もあります FinalDo演算子–doAfterTerminateを優先して非推奨になりました。 Observableが完了するとアクションを登録します。

3. ObserveOnSubscribeOn

デフォルトでは、 Observable と演算子チェーンは、Subscribeメソッドが呼び出されたのと同じスレッドで動作します。

ObserveOn オペレーターは、ObservableObserversに通知を送信するために使用する別のSchedulerを指定します:

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .observeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

要素がメインスレッドで生成され、最初のmap呼び出しまでプッシュされたことがわかります。

しかしその後、observeOnは処理を計算スレッドにリダイレクトしました。これはマップと最後のサブスクライバーを処理するときに使用されました。

observeOnで発生する可能性のある問題の1つは、ボトムストリームがトップストリームが処理できるよりも速く排出を生成する可能性があることです。これにより、背圧で問題が発生する可能性があります。

スケジューラーObservableが動作するかを指定するには、subscribeOn演算子を使用できます。

Observable.range(1, 5)
  .map(i -> i * 100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .subscribeOn(Schedulers.computation())
  .map(i -> i * 10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

SubscribeOn は、ソース Observable にアイテムの発行に使用するスレッドを指示します。このスレッドのみが、アイテムをSubscriberにプッシュします。 サブスクリプションにのみ影響するため、ストリーム内の任意の場所に配置できます。

事実上、使用できる subscribeOn は1つだけですが、observeOn演算子はいくつでも使用できます。 observeOn。を使用すると、エミッションをあるスレッドから別のスレッドに簡単に切り替えることができます。

4. シングルおよびシングルまたはデフォルト

演算子Singleは、ソース Observable:によって発行された単一のアイテムを発行するObservableを返します。

Observable.range(1, 1)
  .single()
  .subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);

ソースObservableがゼロまたは複数の要素を生成する場合、例外がスローされます。

Observable.empty()
  .single()
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

一方、演算子 SingleOrDefault は、 Singleと非常によく似ています。つまり、は、ソースから単一のアイテムを放出する Observable も返しますが、さらに、デフォルト値を指定できます。

Observable.empty()
  .singleOrDefault("Default")
  .subscribe(i -> result +=i);
assertTrue(result.equals("Default"));

ただし、 Observable ソースが複数のアイテムを発行する場合でも、 IllegalArgumentExeption:をスローします。

Observable.range(1, 3)
  .singleOrDefault(5)
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

S簡単な結論:

  • ソースObservableに要素がないか、1つあると予想される場合は、SingleOrDefaultを使用する必要があります
  • Observable で放出される可能性のある複数のアイテムを処理していて、最初または最後の値のみを放出したい場合は、first最後

5. タイムスタンプ

Timestampオペレーターは、ソースObservable によって発行された各アイテムにタイムスタンプを付加してから、そのアイテムを独自のシーケンスで再送信します。 タイムスタンプは、アイテムが発行された時刻を示します。

Observable.range(1, 10)
  .timestamp()
  .map(o -> result = o.getClass().toString() )
  .last()
  .subscribe();
 
assertTrue(result.equals("class rx.schedulers.Timestamped"));

6. 遅延

このオペレーターは、ソース Observableのアイテムのそれぞれを発行する前に、特定の時間増分で一時停止することにより、ソースObservableを変更します。

提供された値を使用してシーケンス全体をオフセットします。

Observable source = Observable.interval(1, TimeUnit.SECONDS)
  .take(5)
  .timestamp();

Observable delayedObservable
  = source.delay(2, TimeUnit.SECONDS);

source.subscribe(
  value -> System.out.println("source :" + value),
  t -> System.out.println("source error"),
  () -> System.out.println("source completed"));

delayedObservable.subscribe(
  value -> System.out.println("delay : " + value),
  t -> System.out.println("delay error"),
  () -> System.out.println("delay completed"));
Thread.sleep(8000);

delaySubscriptionと呼ばれるソースObservableへのサブスクリプションを遅らせることができる代替演算子があります。

Delay演算子はデフォルトでcomputation Scheduler で実行されますが、オプションの3番目として渡すことで別のSchedulerを選択できますdelaySubscriptionへのパラメーター。

7. 繰り返し

Repeat は、アップストリームからの完了通知をインターセプトするだけで、ダウンストリームに渡すのではなく、再サブスクライブします。

したがって、 repeat が同じ一連のイベントを循環し続けることは保証されませんが、アップストリームが固定ストリームの場合はたまたま次のようになります。

Observable.range(1, 3)
  .repeat(3)
  .subscribe(i -> receivedTotal += i);
 
assertTrue(receivedTotal == 18);

8. キャッシュ

キャッシュオペレーターは、subscribeとカスタムObservableの間にあります。

最初のサブスクライバーが表示されると、 cacheはサブスクリプションを基盤となるObservableに委任し、すべての通知(イベント、完了、またはエラー)をダウンストリームに転送します。

ただし、同時に、すべての通知のコピーを内部に保持します。 後続のサブスクライバーがプッシュされた通知を受信する場合、 cacheは基になるObservableに委任せず、代わりにキャッシュされた値をフィードします。

Observable<Integer> source =
  Observable.<Integer>create(subscriber -> {
      System.out.println("Create");
      subscriber.onNext(receivedTotal += 5);
      subscriber.onCompleted();
  }).cache();
source.subscribe(i -> {
  System.out.println("element 1");
  receivedTotal += 1;
});
source.subscribe(i -> {
  System.out.println("element 2");
  receivedTotal += 2;
});
 
assertTrue(receivedTotal == 8);

9. 使用

observerusing()から返された Observable をサブスクライブする場合、Observableファクトリ関数を使用して Observable observer は…監視すると同時に、リソースファクトリ関数を使用して、設計したリソースを作成します。

observerObservableのサブスクライブを解除するとき、または Observable が終了すると、usingは3番目の関数を呼び出して作成されたリソースを破棄します:

Observable<Character> values = Observable.using(
  () -> "resource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result += v,
  e -> result += e
);
assertTrue(result.equals("resource"));

10. 結論

この記事では、RxJavaユーティリティ演算子の使用方法と、それらの最も重要な機能を調べる方法について説明しました。

RxJavaの真の力は、その演算子にあります。 データストリームの宣言型変換は、安全でありながら表現力と柔軟性があります。

関数型プログラミングの強力な基盤により、オペレーターはRxJavaの採用において決定的な役割を果たします。 組み込みの演算子を習得することは、このライブラリで成功するための鍵です。

ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードは、GitHubにあります。