RxJavaで観測可能なユーティリティ演算子
1概要
この記事では、RxJavaで
Observables
を操作するためのユーティリティー演算子と、カスタム演算子の実装方法について説明します。
-
演算子は上流の
Observable <T>
の振る舞いを取って変更し、下流の
Observable <R>またはSubscriber
** を返す関数です。ここで、型TとRは同じである場合も同じでない場合もあります。
オペレータは既存の
Observables
をラップし、通常は購読を傍受することによってそれらを強化します。これは複雑に聞こえるかもしれませんが、実際には非常に柔軟であり、把握するのはそれほど難しくありません。
2
行う
Observableライフサイクルイベントを変更する可能性があるアクションは複数あります。
-
d
__oOnNext
演算子は
Observable
source
を変更して、
onNext
が
.
__と呼ばれたときにアクションを呼び出すようにします。 -
doOnCompleted
演算子は、結果の
Observable
が正常に終了した場合に呼び出される
アクションを登録し、
Observer
‘s s
onCompleted
メソッドを呼び出します。
Observable.range(1, 10)
.doOnNext(r -> receivedTotal += r)
.doOnCompleted(() -> result = "Completed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));
doOnEach
演算子は、
Observable
ソースを変更して、各アイテムの
Observer
に通知し、アイテムが発行されるたびに呼び出されるコールバックを確立します。
doOnSubscribe
演算子は、
Observer
が結果の
Observable
をサブスクライブするたびに呼び出されるアクションを登録します。
doOnSubscribeの逆を行う
doOnUnsubscribe operator__もあります。
Observable.range(1, 10)
.doOnEach(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
receivedTotal += value;
}
})
.doOnSubscribe(() -> result = "Subscribed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));
Observable
がエラーで終了したら、
doOnError
演算子を使用してアクションを実行できます。
DoOnTerminate operator
は、Observableが完了したときに呼び出されるアクションを、成功した場合とエラーの場合があります。
thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
.single()
.doOnError(throwable -> { throw new RuntimeException("error");})
.doOnTerminate(() -> result += "doOnTerminate")
.doAfterTerminate(() -> result += "__doAfterTerminate")
.subscribe();
assertTrue(result.equals("doOnTerminate__doAfterTerminate"));
-
FinallyDo
演算子もあります – これは
doAfterTerminateのために廃止されました。
Observable__が完了したときにアクションを登録します。**
3
ObserveOn
と
SubscribeOn
-
デフォルトでは、
Observable
と演算子チェーンは、その
Subscribe
メソッドが呼び出されたスレッドと同じスレッドで動作します。
_
_operatorは、
Observable
が
Observersに通知を送信するために使用する別の
Scheduler
を指定します。
Observable.range(1, 5)
.map(i -> i ** 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.observeOn(Schedulers.computation())
.map(i -> i ** 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
要素は
main thread
で生成され、最初の
map
呼び出しまでずっとプッシュされたことがわかります。
しかしその後、
observeOn
は処理を
計算スレッド
にリダイレクトしました。
-
observeOn
で発生する可能性がある問題の1つは、トップストリームが処理できるよりもボトムストリームが速く排出を生成する可能性があることです。
Observable
がどの
Scheduler
で動作するかを指定するために、
subscribeOn
演算子を使用できます。
Observable.range(1, 5)
.map(i -> i ** 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.computation())
.map(i -> i ** 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
SubscribeOn
は、アイテムの発行に使用するスレッドをソース
Observable
に指示します – このスレッドだけがアイテムを
Subscriber
にプッシュします。
サブスクリプションにのみ影響するため、ストリーム内の任意の場所に配置できます。
事実上、
subscribeOn
を1つだけ使用できますが、
observeOn
演算子はいくつでも使用できます。
observeOn.
を使用すると、スレッド間で排出量を簡単に切り替えることができます。
4
Single
と
SingleOrDefault
-
演算子
Single
は、ソース
Observableによって発行された単一のアイテムを発行する
Observable__を返します。
Observable.range(1, 1)
.single()
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);
ソース
Observable
が0個以上の要素を生成した場合は、例外がスローされます。
Observable.empty()
.single()
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
一方、演算子
SingleOrDefault
は
Singleと非常によく似ています。これは、ソースから単一のアイテムを出力する
Observable__も返すことを意味しますが、さらにデフォルト値を指定できます。
Observable.empty()
.singleOrDefault("Default")
.subscribe(i -> result +=i);
assertTrue(result.equals("Default"));
しかし、
Observable
ソースが複数のアイテムを発行しても、__IllegalArgumentExeptionがスローされます。
Observable.range(1, 3)
.singleOrDefault(5)
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
S簡単な結論:
-
ソース
Observable
には、何もないか1つ含まれる可能性があると予想される場合
要素、そして
SingleOrDefault
を使用する必要があります
** 私達が私達の中で放出される潜在的に複数のアイテムを扱っているならば
Observable
では、最初または最後の値のみを出力したいので、
__や
__のような他の演算子を使用できます。
5
タイムスタンプ
-
_演算子は、ソース
Observable_
** によって発行された各項目に、独自の順序でその項目を再発行する前にタイムスタンプを付けます。タイムスタンプは、アイテムが発行された時刻を示します。
Observable.range(1, 10)
.timestamp()
.map(o -> result = o.getClass().toString() )
.last()
.subscribe();
assertTrue(result.equals("class rx.schedulers.Timestamped"));
6.
ディレイ
-
この演算子は、特定の時間間隔で一時停止することによって、そのソース
オブザーバブル
を変更します。
提供された値を使用してシーケンス全体をオフセットします。
Observable source = Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.timestamp();
Observable delayedObservable
= source.delay(2, TimeUnit.SECONDS);
source.subscribe(
value -> System.out.println("source :" + value),
t -> System.out.println("source error"),
() -> System.out.println("source completed"));
delayedObservable.subscribe(
value -> System.out.println("delay : " + value),
t -> System.out.println("delay error"),
() -> System.out.println("delay completed"));
Thread.sleep(8000);
delaySubscription
と呼ばれるソースObservableへのサブスクリプションを遅らせることができる代替オペレータがあります。
Delay
演算子は、デフォルトで
computation
Scheduler
に対して実行されますが、
delaySubscription
にオプションの3番目のパラメーターとして渡すことで、別の
Scheduler
を選択できます。
7.
繰り返す
-
Repeat
は、アップストリームからの完了通知を単純に傍受し、ダウンストリームに渡すのではなく再サブスクライブします。
したがって、
repeat
が同じ一連のイベントを繰り返し実行することが保証されるわけではありませんが、アップストリームが固定ストリームである場合に起こることがあります。
Observable.range(1, 3)
.repeat(3)
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 18);
8
キャッシュ
cache
演算子は
subscribe
とカスタム
Observable
の間にあります。
-
最初のサブスクライバが現れると、
cache
はサブスクリプションを基礎となる
Observable
に委任し、すべての通知(イベント、完了、またはエラー)を下流に転送します。
ただし、同時に、すべての通知のコピーが内部に保存されます。後続のサブスクライバがプッシュ通知を受け取りたい場合、
cache
はもはや基盤となる
Observable
に委任しませんが、代わりにキャッシュ値を送ります。
Observable<Integer> source =
Observable.<Integer>create(subscriber -> {
System.out.println("Create");
subscriber.onNext(receivedTotal += 5);
subscriber.onCompleted();
}).cache();
source.subscribe(i -> {
System.out.println("element 1");
receivedTotal += 1;
});
source.subscribe(i -> {
System.out.println("element 2");
receivedTotal += 2;
});
assertTrue(receivedTotal == 8);
9
使用
observer
が
using()
から返された
Observable
を購読するとき、
Observable
ファクトリ関数を使用して
Observable
を監視します。それを作るためにそれを設計しました。
observer
が
Observable
から退会するとき、または
Observable
が終了するとき、
using
は3番目の関数を呼び出して、作成されたリソースを破棄します。
Observable<Character> values = Observable.using(
() -> "resource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result += v,
e -> result += e
);
assertTrue(result.equals("resource"));
10結論
この記事では、RxJavaユーティリティ演算子の使い方と、最も重要な機能の探求方法について説明しました。
RxJavaの真の力はその演算子にあります。データストリームの宣言型変換は安全でありながら表現力があり、柔軟です。
関数型プログラミングの強力な基盤により、オペレータはRxJavaの採用において決定的な役割を果たします。このライブラリを成功させるには、組み込み演算子を習得することが重要です。
ここで使用されているすべてのコードサンプルを含むプロジェクトの完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/rxjava[Githubに追加]で見つけることができます。