RxJava 2 – 流動性
1前書き
-
RxJavaは、イベント駆動型の非同期アプリケーションを書くことを可能にするReactive Extensions Javaの実装です** RxJavaの使い方の詳細については、/rx-javaを参照してください。
RxJava 2は一から書き直され、複数の新機能が追加されました。そのうちのいくつかは、以前のバージョンのフレームワークに存在していた問題に対する応答として作成されました。
そのような機能の1つは
io.reactivex.Flowable
です。
2
観測可能
vs
_. 流動性
_
RxJavaの以前のバージョンでは、バックプレッシャー対応と非バックプレッシャー対応のソースを扱うための基本クラスは1つしかありませんでした。
RxJava 2では、これら2種類のソース間の明確な区別が導入されました。バックプレッシャー対応のソースは、専用のクラス
Flowable.
を使用して表されるようになりました。
-
観察可能な情報源は背圧をサポートしていません。そのため、私たちは単に消費して影響を及ぼさない情報源としてそれを使用するべきです
また、多数の要素を扱う場合、
Observable
の種類によっては、背圧に関連して考えられる2つの問題が発生する可能性があります。
いわゆる
”
cold
Observable
“を使用した場合、
イベントは遅延して発生しますので、オブザーバーがオーバーフローするのを防ぐことができます。
-
“
hot
_Observable
_
”
** を使うとき、たとえ消費者が追いつくことができないとしても、単にイベントを発行し続けるでしょう。
背圧についての詳細はリンクを見てください:/rxjava-backpressure[ここで、私たちの背圧に焦点を当てた記事で]。
3
Flowable
を作成する
Flowable
を作成する方法はいくつかあります。私たちにとって便利なことに、これらのメソッドはRxJavaの最初のバージョンの
Observable
のメソッドに似ています。
3.1. シンプル
花
Observableの場合と同様に、
just()
メソッドを使用して
Flowable__を作成できます。
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);
just()
の使用は非常に簡単ですが、静的データから
Flowable
を作成することはあまり一般的ではなく、テスト目的で使用されています。
** 3.2.
観測
から
流動
-
Observable
があるときは、
toFlowable()
メソッドを使って簡単に
Flowable__に変換することができます。
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
.toFlowable(BackpressureStrategy.BUFFER);
変換を実行できるようにするには、
Observable
を__BackpressureStrategyで強化する必要があります。次のセクションで利用可能な戦略について説明します。
3.3.
Flowable
from
FlowableOnSubscribe
RxJava 2では、消費者が購読した後にイベントの発行を開始する
Flowable
を表す機能インタフェース
FlowableOnSubscribe
が導入されました。
そのため、すべてのクライアントが同じイベントのセットを受け取ることになり、これによって
FlowableOnSubscribe
バックプレッシャーが安全になります。
FlowableOnSubscribe
がある場合は、それを使用して
Flowable
を作成できます。
FlowableOnSubscribe<Integer> flowableOnSubscribe
= flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
ドキュメントには、
Flowable.
を作成するためのさらに多くの方法が説明されています。
4
Flowable
BackpressureStrategy
toFlowable()
や
create()
のようなメソッドは
BackpressureStrategy
を引数に取ります。
-
BackpressureStrategy
は列挙型で、
__
Flowableに適用する背圧の動作を定義します。
それはイベントをキャッシュするかドロップするか、あるいはまったく振る舞いをまったく実装しないことができます、最後のケースでは、背圧演算子を使ってそれを定義することに責任があります。
BackpressureStrategy
は、以前のバージョンのRxJavaに存在した
BackpressureMode
と似ています。
RxJava 2には5つの異なる方法があります。
4.1. バッファ
-
BackpressureStrategy.BUFFERを使用すると、
サブスクライバがそれらを消費できるようになるまで、ソースはすべてのイベントをバッファします** :
public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000)
.boxed()
.collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertEquals(testList, receivedInts);
}
Flowableで
onBackpressureBuffer()__メソッドを呼び出すのと似ていますが、バッファサイズやonOverflowアクションを明示的に定義することはできません。
4.2. ドロップ
バッファリングする代わりに、消費できないイベントを破棄するために
BackpressureStrategy.DROP
を使用することができます。
繰り返しますが、これは
Flowable
で
onBackpressureDrop()
を使用するのと同じです。
public void whenDropStrategyUsed__thenOnBackpressureDropped() {
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}
4.3. 最新
-
BackpressureStrategy.LATEST
を使用すると、ソースは最新のイベントのみを保持するように強制されるため、コンシューマが追いつかない場合は以前の値を上書きします。**
public void whenLatestStrategyUsed__thenTheLastElementReceived() {
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}
コードを見ると、
BackpressureStrategy.LATESTとBackpressureStrategy.DROP
はよく似ています。
-
ただし、
BackpressureStrategy.LATEST
は、登録者が処理できない要素を上書きし、最新のものだけを保存するため、名前となります。**
一方、__BackpressureStrategy.DROPは、処理できない要素を破棄します。これは、最新の要素が必ずしも発行されないことを意味します。
4.4. エラー
BackpressureStrategy.ERRORを使用している場合、
単に背圧が発生するとは予想していません** と言っているだけです。したがって、コンシューマがソースに追いつかない場合は、
MissingBackpressureException
がスローされます。
public void whenErrorStrategyUsed__thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.ERROR)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
4.5. 行方不明
BackpressureStrategy.MISSING
を使用すると、ソースは破棄またはバッファリングせずに要素をプッシュします。
この場合、ダウンストリームはオーバーフローに対処する必要があります。
public void whenMissingStrategyUsed__thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.MISSING)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
今回のテストでは、
ERROR
と
MISSING
の両方の戦略について、
MissingbackpressureException
を除外します。ソースの内部バッファがオーバーフローすると、どちらもそのような例外をスローします。
ただし、どちらも目的が異なることに注意してください。
背圧がまったく予想されない場合は前者を使用し、発生した場合にはソースから例外をスローするようにします。
Flowable
の作成時にデフォルトの動作を指定したくない場合は、後者の方法を使用できます。そして、後で定義するためにバックプレッシャー演算子を使用します。
5概要
このチュートリアルでは、RxJava
2
で導入された
Flowable.
という新しいクラスを紹介しました。
Flowable
自体とそのAPIの詳細については、http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html[ドキュメント]を参照してください。
いつものように、すべてのコードサンプルはhttps://github.com/eugenp/tutorials/tree/master/rxjava-2[GitHubに載って]を見つけることができます。