1概要

この記事では、RxJavaで

Observables

を操作するためのユーティリティー演算子と、カスタム演算子の実装方法について説明します。

  • 演算子は上流の

    Observable <T>

    の振る舞いを取って変更し、下流の

    Observable <R>またはSubscriber

    ** を返す関数です。ここで、型TとRは同じである場合も同じでない場合もあります。

オペレータは既存の

Observables

をラップし、通常は購読を傍受することによってそれらを強化します。これは複雑に聞こえるかもしれませんが、実際には非常に柔軟であり、把握するのはそれほど難しくありません。


2

行う


Observableライフサイクルイベントを変更する可能性があるアクションは複数あります。

  • d

    __oOnNext


    演算子は

    Observable

    source


    を変更して、

    onNext




    .

    __と呼ばれたときにアクションを呼び出すようにします。


  • doOnCompleted

    演算子は、結果の

    Observable

    が正常に終了した場合に呼び出される

    アクションを登録し、


    Observer

    ‘s s

    onCompleted

    メソッドを呼び出します。

Observable.range(1, 10)
  .doOnNext(r -> receivedTotal += r)
  .doOnCompleted(() -> result = "Completed")
  .subscribe();

assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));


doOnEach

演算子は、

Observable

ソースを変更して、各アイテムの

Observer

に通知し、アイテムが発行されるたびに呼び出されるコールバックを確立します。


doOnSubscribe

演算子は、

Observer

が結果の

Observable

をサブスクライブするたびに呼び出されるアクションを登録します。


doOnSubscribeの逆を行う

doOnUnsubscribe operator__もあります。

Observable.range(1, 10)
  .doOnEach(new Observer<Integer>() {
      @Override
      public void onCompleted() {
          System.out.println("Complete");
      }
      @Override
      public void onError(Throwable e) {
          e.printStackTrace();
      }
      @Override
      public void onNext(Integer value) {
          receivedTotal += value;
      }
  })
  .doOnSubscribe(() -> result = "Subscribed")
  .subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));


Observable

がエラーで終了したら、

doOnError

演算子を使用してアクションを実行できます。


DoOnTerminate operator

は、Observableが完了したときに呼び出されるアクションを、成功した場合とエラーの場合があります。

thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
  .single()
  .doOnError(throwable -> { throw new RuntimeException("error");})
  .doOnTerminate(() -> result += "doOnTerminate")
  .doAfterTerminate(() -> result += "__doAfterTerminate")
  .subscribe();
assertTrue(result.equals("doOnTerminate__doAfterTerminate"));


  • FinallyDo

    演算子もあります – これは

    doAfterTerminateのために廃止されました。

    Observable__が完了したときにアクションを登録します。**


3

ObserveOn



SubscribeOn


  • デフォルトでは、

    Observable

    と演算子チェーンは、その

    Subscribe

    メソッドが呼び出されたスレッドと同じスレッドで動作します。


_


_operatorは、

Observable



Observersに通知を送信するために使用する別の

Scheduler

を指定します。

Observable.range(1, 5)
  .map(i -> i **  100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .observeOn(Schedulers.computation())
  .map(i -> i **  10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);

要素は

main thread

で生成され、最初の

map

呼び出しまでずっとプッシュされたことがわかります。

しかしその後、

observeOn

は処理を

計算スレッド

にリダイレクトしました。


  • observeOn

    で発生する可能性がある問題の1つは、トップストリームが処理できるよりもボトムストリームが速く排出を生成する可能性があることです。


Observable

がどの

Scheduler

で動作するかを指定するために、

subscribeOn

演算子を使用できます。

Observable.range(1, 5)
  .map(i -> i **  100)
  .doOnNext(i -> {
      emittedTotal += i;
      System.out.println("Emitting " + i
        + " on thread " + Thread.currentThread().getName());
  })
  .subscribeOn(Schedulers.computation())
  .map(i -> i **  10)
  .subscribe(i -> {
      receivedTotal += i;
      System.out.println("Received " + i + " on thread "
        + Thread.currentThread().getName());
  });

Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);


SubscribeOn

は、アイテムの発行に使用するスレッドをソース

Observable

に指示します – このスレッドだけがアイテムを

Subscriber

にプッシュします。

サブスクリプションにのみ影響するため、ストリーム内の任意の場所に配置できます。

事実上、

subscribeOn

を1つだけ使用できますが、

observeOn

演算子はいくつでも使用できます。

observeOn.

を使用すると、スレッド間で排出量を簡単に切り替えることができます。


4

Single



SingleOrDefault


  • 演算子

    Single

    は、ソース

    Observableによって発行された単一のアイテムを発行する

    Observable__を返します。

Observable.range(1, 1)
  .single()
  .subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);

ソース

Observable

が0個以上の要素を生成した場合は、例外がスローされます。

Observable.empty()
  .single()
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

一方、演算子

SingleOrDefault



Singleと非常によく似ています。これは、ソースから単一のアイテムを出力する

Observable__も返すことを意味しますが、さらにデフォルト値を指定できます。

Observable.empty()
  .singleOrDefault("Default")
  .subscribe(i -> result +=i);
assertTrue(result.equals("Default"));

しかし、

Observable

ソースが複数のアイテムを発行しても、__IllegalArgumentExeptionがスローされます。

Observable.range(1, 3)
  .singleOrDefault(5)
  .onErrorReturn(e -> receivedTotal += 10)
  .subscribe();
assertTrue(receivedTotal == 10);

S簡単な結論:

  • ソース

    Observable

    には、何もないか1つ含まれる可能性があると予想される場合

要素、そして

SingleOrDefault

を使用する必要があります
** 私達が私達の中で放出される潜在的に複数のアイテムを扱っているならば


Observable

では、最初または最後の値のみを出力したいので、

__や

__のような他の演算子を使用できます。


5

タイムスタンプ




  • _演算子は、ソース

    Observable_

    ** によって発行された各項目に、独自の順序でその項目を再発行する前にタイムスタンプを付けます。タイムスタンプは、アイテムが発行された時刻を示します。

Observable.range(1, 10)
  .timestamp()
  .map(o -> result = o.getClass().toString() )
  .last()
  .subscribe();

assertTrue(result.equals("class rx.schedulers.Timestamped"));


6.

ディレイ


  • この演算子は、特定の時間間隔で一時停止することによって、そのソース

    オブザーバブル

    を変更します。

提供された値を使用してシーケンス全体をオフセットします。

Observable source = Observable.interval(1, TimeUnit.SECONDS)
  .take(5)
  .timestamp();

Observable delayedObservable
  = source.delay(2, TimeUnit.SECONDS);

source.subscribe(
  value -> System.out.println("source :" + value),
  t -> System.out.println("source error"),
  () -> System.out.println("source completed"));

delayedObservable.subscribe(
  value -> System.out.println("delay : " + value),
  t -> System.out.println("delay error"),
  () -> System.out.println("delay completed"));
Thread.sleep(8000);


delaySubscription

と呼ばれるソースObservableへのサブスクリプションを遅らせることができる代替オペレータがあります。


Delay

演算子は、デフォルトで

computation



Scheduler


に対して実行されますが、

delaySubscription

にオプションの3番目のパラメーターとして渡すことで、別の

Scheduler

を選択できます。


7.

繰り返す



  • Repeat

    は、アップストリームからの完了通知を単純に傍受し、ダウンストリームに渡すのではなく再サブスクライブします。

したがって、

repeat

が同じ一連のイベントを繰り返し実行することが保証されるわけではありませんが、アップストリームが固定ストリームである場合に起こることがあります。

Observable.range(1, 3)
  .repeat(3)
  .subscribe(i -> receivedTotal += i);

assertTrue(receivedTotal == 18);


8

キャッシュ



cache

演算子は

subscribe

とカスタム

Observable

の間にあります。

  • 最初のサブスクライバが現れると、

    cache

    はサブスクリプションを基礎となる

    Observable

    に委任し、すべての通知(イベント、完了、またはエラー)を下流に転送します。

ただし、同時に、すべての通知のコピーが内部に保存されます。後続のサブスクライバがプッシュ通知を受け取りたい場合、

cache

はもはや基盤となる

Observable

に委任しませんが、代わりにキャッシュ値を送ります。

Observable<Integer> source =
  Observable.<Integer>create(subscriber -> {
      System.out.println("Create");
      subscriber.onNext(receivedTotal += 5);
      subscriber.onCompleted();
  }).cache();
source.subscribe(i -> {
  System.out.println("element 1");
  receivedTotal += 1;
});
source.subscribe(i -> {
  System.out.println("element 2");
  receivedTotal += 2;
});

assertTrue(receivedTotal == 8);


9

使用



observer



using()

から返された

Observable

を購読するとき、

Observable

ファクトリ関数を使用して

Observable

を監視します。それを作るためにそれを設計しました。


observer



Observable

から退会するとき、または

Observable

が終了するとき、

using

は3番目の関数を呼び出して、作成されたリソースを破棄します。

Observable<Character> values = Observable.using(
  () -> "resource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result += v,
  e -> result += e
);
assertTrue(result.equals("resource"));


10結論

この記事では、RxJavaユーティリティ演算子の使い方と、最も重要な機能の探求方法について説明しました。

RxJavaの真の力はその演算子にあります。データストリームの宣言型変換は安全でありながら表現力があり、柔軟です。

関数型プログラミングの強力な基盤により、オペレータはRxJavaの採用において決定的な役割を果たします。このライブラリを成功させるには、組み込み演算子を習得することが重要です。

ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/rxjava[Githubに追加]で見つけることができます。