1前書き

  • RxJavaは、イベント駆動型の非同期アプリケーションを書くことを可能にするReactive Extensions Javaの実装です** RxJavaの使い方の詳細については、/rx-javaを参照してください。

RxJava 2は一から書き直され、複数の新機能が追加されました。そのうちのいくつかは、以前のバージョンのフレームワークに存在していた問題に対する応答として作成されました。

そのような機能の1つは


io.reactivex.Flowable


です。


2

観測可能

vs


_. 流動性

_


RxJavaの以前のバージョンでは、バックプレッシャー対応と非バックプレッシャー対応のソースを扱うための基本クラスは1つしかありませんでした。

RxJava 2では、これら2種類のソース間の明確な区別が導入されました。バックプレッシャー対応のソースは、専用のクラス

Flowable.

を使用して表されるようになりました。

  • 観察可能な情報源は背圧をサポートしていません。そのため、私たちは単に消費して影響を及ぼさない情報源としてそれを使用するべきです

また、多数の要素を扱う場合、

Observable

の種類によっては、背圧に関連して考えられる2つの問題が発生する可能性があります。

いわゆる





cold

Observable


“を使用した場合、



イベントは遅延して発生しますので、オブザーバーがオーバーフローするのを防ぐことができます。





  • hot


    _Observable

    _





    ** を使うとき、たとえ消費者が追いつくことができないとしても、単にイベントを発行し続けるでしょう。

背圧についての詳細はリンクを見てください:/rxjava-backpressure[ここで、私たちの背圧に焦点を当てた記事で]。


3

Flowable


を作成する


Flowable

を作成する方法はいくつかあります。私たちにとって便利なことに、これらのメソッドはRxJavaの最初のバージョンの

Observable

のメソッドに似ています。


3.1. シンプル





Observableの場合と同様に、

just()

メソッドを使用して

Flowable__を作成できます。

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);


just()

の使用は非常に簡単ですが、静的データから

Flowable

を作成することはあまり一般的ではなく、テスト目的で使用されています。

** 3.2.

観測

から

流動

  • Observable

    があるときは、

    toFlowable()

    メソッドを使って簡単に

    Flowable__に変換することができます。

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
  .toFlowable(BackpressureStrategy.BUFFER);

変換を実行できるようにするには、

Observable

を__BackpressureStrategyで強化する必要があります。次のセクションで利用可能な戦略について説明します。


3.3.

Flowable

from

FlowableOnSubscribe


RxJava 2では、消費者が購読した後にイベントの発行を開始する

Flowable

を表す機能インタフェース

FlowableOnSubscribe

が導入されました。

そのため、すべてのクライアントが同じイベントのセットを受け取ることになり、これによって

FlowableOnSubscribe

バックプレッシャーが安全になります。


FlowableOnSubscribe

がある場合は、それを使用して

Flowable

を作成できます。

FlowableOnSubscribe<Integer> flowableOnSubscribe
 = flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
  .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

ドキュメントには、

Flowable.

を作成するためのさらに多くの方法が説明されています。


4

Flowable


BackpressureStrategy



toFlowable()



create()

のようなメソッドは

BackpressureStrategy

を引数に取ります。


  • BackpressureStrategy

    は列挙型で、

    __

    Flowableに適用する背圧の動作を定義します。

それはイベントをキャッシュするかドロップするか、あるいはまったく振る舞いをまったく実装しないことができます、最後のケースでは、背圧演算子を使ってそれを定義することに責任があります。


BackpressureStrategy

は、以前のバージョンのRxJavaに存在した

BackpressureMode

と似ています。

RxJava 2には5つの異なる方法があります。


4.1. バッファ


  • BackpressureStrategy.BUFFERを使用すると、

    サブスクライバがそれらを消費できるようになるまで、ソースはすべてのイベントをバッファします** :

public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000)
      .boxed()
      .collect(Collectors.toList());

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}


Flowableで

onBackpressureBuffer()__メソッドを呼び出すのと似ていますが、バッファサイズやonOverflowアクションを明示的に定義することはできません。


4.2. ドロップ

バッファリングする代わりに、消費できないイベントを破棄するために

BackpressureStrategy.DROP

を使用することができます。

繰り返しますが、これは

Flowable



onBackpressureDrop()

を使用するのと同じです。

public void whenDropStrategyUsed__thenOnBackpressureDropped() {

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.DROP)
      .observeOn(Schedulers.computation())
      .test();
    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(!receivedInts.contains(100000));
 }


4.3. 最新


  • BackpressureStrategy.LATEST

    を使用すると、ソースは最新のイベントのみを保持するように強制されるため、コンシューマが追いつかない場合は以前の値を上書きします。**

public void whenLatestStrategyUsed__thenTheLastElementReceived() {

    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.LATEST)
      .observeOn(Schedulers.computation())
      .test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
 }

コードを見ると、

BackpressureStrategy.LATESTとBackpressureStrategy.DROP

はよく似ています。

  • ただし、

    BackpressureStrategy.LATEST

    は、登録者が処理できない要素を上書きし、最新のものだけを保存するため、名前となります。**

一方、__BackpressureStrategy.DROPは、処理できない要素を破棄します。これは、最新の要素が必ずしも発行されないことを意味します。


4.4. エラー


BackpressureStrategy.ERRORを使用している場合、

単に背圧が発生するとは予想していません** と言っているだけです。したがって、コンシューマがソースに追いつかない場合は、

MissingBackpressureException

がスローされます。

public void whenErrorStrategyUsed__thenExceptionIsThrown() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.ERROR)
      .observeOn(Schedulers.computation())
      .test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}


4.5. 行方不明


BackpressureStrategy.MISSING

を使用すると、ソースは破棄またはバッファリングせずに要素をプッシュします。

この場合、ダウンストリームはオーバーフローに対処する必要があります。

public void whenMissingStrategyUsed__thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.MISSING)
      .observeOn(Schedulers.computation())
      .test();
    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

今回のテストでは、

ERROR



MISSING

の両方の戦略について、

MissingbackpressureException

を除外します。ソースの内部バッファがオーバーフローすると、どちらもそのような例外をスローします。

ただし、どちらも目的が異なることに注意してください。

背圧がまったく予想されない場合は前者を使用し、発生した場合にはソースから例外をスローするようにします。


Flowable

の作成時にデフォルトの動作を指定したくない場合は、後者の方法を使用できます。そして、後で定義するためにバックプレッシャー演算子を使用します。


5概要

このチュートリアルでは、RxJava

2

で導入された

Flowable.

という新しいクラスを紹介しました。


Flowable

自体とそのAPIの詳細については、http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html[ドキュメント]を参照してください。

いつものように、すべてのコードサンプルはhttps://github.com/eugenp/tutorials/tree/master/rxjava-2[GitHubに載って]を見つけることができます。