RxJavaで背圧に対処する
1概要
この記事では、https://github.com/ReactiveX/RxJava[RxJava library]がバックプレッシャーを処理するのに役立つ方法を見ていきます。
簡単に言うと、RxJavaは、1つまたは複数の
Observers
がサブスクライブできる
Observables
を導入することによって、リアクティブストリームの概念を利用します。私たちは背圧の問題に直面する必要があるので、おそらく無限のストリームを扱うことは非常に挑戦的です。
Observable
がアイテムを消費者よりも早く放出している状況に入るのは難しくありません。未消費アイテムのバッファが増大するという問題に対するさまざまな解決策を見ていきます。
** 2ホット
オブザーバブル
とコールド
オブザーバブル
まず、後で定義する
Observables
の要素のコンシューマとして使用される簡単なコンシューマ関数を作成しましょう。
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
私たちの
compute()
関数は単に引数を出力しています。ここで気をつけるべき重要なことは
Thread.sleep(1000)
メソッドの呼び出しです – 私たちは
Observable
がそれらを消費することができるより早くアイテムでいっぱいになるいくつかの長期実行タスクをエミュレートするためにそれをしています。
背圧処理に関しては全く異なる2つのタイプの
オブザーバブル – Hot
と
Cold
– があります。
2.1. 冷たい
観測値
コールド
オブザーバブル
は特定のシーケンスのアイテムを発行しますが、その
オブザーバーが便利であると判断したときに、そしてシーケンスの完全性を損なうことなく
オブザーバーが望むレートで、このシーケンスを発行し始めることができます。 ** 冷たい
観察可能
は、怠惰な方法でアイテムを提供しています。
Observer
は、そのアイテムを処理する準備が整ったときにのみ要素を取得します。アイテムはプル形式で要求されるため、
Observable
にバッファする必要はありません。
たとえば、100万から100万までの静的な要素の範囲に基づいて
Observable
を作成した場合、それらの項目がどれほど頻繁に監視されていても、その
Observable
は同じ一連の項目を出力します。
Observable.range(1, 1__000__000)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute);
プログラムを起動すると、項目は
Observer
lazilyによって計算され、プル方式で要求されます。
Schedulers.computation()
メソッドは、
RxJavaの計算スレッドプール内で
Observer__を実行したいことを意味します。
プログラムの出力は、
Observable
から1つずつ項目に対して呼び出された
compute()
メソッドの結果で構成されます。
compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...
冷たい
オブザーバブル
は引っ張って働くので、どんな形の背圧も必要としません。コールド
オブザーバブル
によって発行されたアイテムの例には、データベースクエリ、ファイル検索、またはWeb要求の結果が含まれます。
2.2. ホット
観測値
ホット
オブザーバブル
はアイテムの生成を開始し、それらが作成されるとすぐにそれらを放出します。これはCold Observables Pullモデルの処理とは反対です。 ** ホット
オブザーバブル
はそれ自身のペースでアイテムを放出します。そして、追いつくのはオブザーバー次第です。
Observer
が
Observable
によって生成されるのと同じくらい速くアイテムを消費することができないとき、それらはメモリをいっぱいにするので、バッファリングされるか、または他の方法で処理される必要があります。
ホットなObservableの例を考えてみましょう。これは、アイテムを処理しているエンドユーザーに100万のアイテムを生産しているというものです。
Observer
の
compute()
メソッドがすべての項目を処理するのに時間がかかる場合、
Observable
は項目でメモリをいっぱいにし始めているため、プログラムが失敗します。
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
IntStream.range(1, 1__000__000).forEach(source::onNext);
Observable
の過剰生成を処理する方法を定義していないため、そのプログラムの実行は
MissingBackpressureException
で失敗します。
熱い
観測可能な
によって放出されるアイテムの例はマウスを含むかもしれません
3バッファリング過剰生成
Observable
Observable
の過剰生成を処理する最初の方法は、__Observerによって処理できない要素に対してある種のバッファを定義することです。
それを行うには
buffer()
メソッドを呼び出します。
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
サイズが1024のバッファを定義すると、
Observer
が過剰生産のソースに追いつくまでにしばらく時間がかかります。バッファはまだ処理されていないアイテムを格納します。
生成された値に十分なスペースを確保するためにバッファサイズを増やすことができます。
ただし、一般に、ソースが予測バッファサイズを過剰生成した場合でもオーバーフローが発生する可能性があるため、これは一時的な修正にすぎない可能性があります。
4発行アイテムのバッチ処理
N要素のウィンドウで、過剰生産されたアイテムをバッチ処理できます。
Observable
が
Observer
がそれらを処理することができるより速く要素を生産しているとき、私たちは生産された要素を一緒にグループ化して要素ごとにではなく要素の集合を処理することができる
Observer
にバッチ要素を送ることによってこれを軽減できます:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.window(500)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
引数
500を指定して
window()
メソッドを使用すると、
Observable
は要素を500サイズのバッチにグループ化します。
Observer
が要素を一つずつ処理するよりも速く要素のバッチを処理することができるとき、この技術は
Observable__を過剰生産する問題を減らすことができます。
5スキップ要素
Observable
によって生成された値の一部を安全に無視できる場合は、特定の時間内にサンプリングを使用して演算子を調整できます。
メソッド
sample()
と
throttleFirst()
はdurationをパラメータとして取ります。
-
s
__ample()
__メソッドは定期的に次のシーケンスを調べます。
期間内に生産された最後のアイテムを要素化して放出します
パラメータとして指定
**
throttleFirst()
メソッドは生成された最初のアイテムを発行します
パラメータとして指定された期間の後
期間は、生成された要素のシーケンスから1つの特定の要素が選択されるまでの時間です。要素をスキップすることで背圧を処理するための戦略を指定できます。
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.sample(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
要素をスキップする戦略は
sample()
メソッドになることを指定しました。 100ミリ秒の長さのシーケンスのサンプルが必要です。
その要素は__Observerに出力されます。
ただし、これらの演算子はダウンストリームの
Observer
による値の受信率を下げるため、
MissingBackpressureException
が発生する可能性があります。
6. 満杯__観測バッファ
の取り扱い
要素のサンプリングやバッチ処理の戦略がバッファを埋めるのに役立たない場合は、バッファが埋まったときにケースを処理する戦略を実装する必要があります。
BufferOverflowExceptionを防ぐには、
onBackpressureBuffer()__メソッドを使用する必要があります。
onBackpressureBuffer()
メソッドは3つの引数を取ります:
Observable
バッファの容量、バッファがいっぱいになったときに呼び出されるメソッド、およびバッファから破棄される必要がある要素を扱うための戦略オーバーフローの戦略は
BackpressureOverflow
クラスにあります。
バッファがいっぱいになったときに実行できるアクションは4種類あります。
-
ON
OVERFLOW
ERROR –
これは、aを通知するデフォルトの動作です。
バッファがいっぱいのとき
BufferOverflowException
**
ON
OVERFLOW
DEFAULT –
現在は同じです。
ON
OVERFLOW
ERROR
**
ON
OVERFLOW
DROP
LATEST__ – オーバーフローが発生した場合、現在の
値は単純に無視され、古い値のみが配信されます
ダウンストリームの
Observer
が要求すると
**
ON
OVERFLOW
DROP
OLDEST__ – バッファ内の最も古い要素を削除します。
現在の値をそれに追加します
その戦略を指定する方法を見てみましょう。
Observable.range(1, 1__000__000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON__OVERFLOW__DROP__OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> {}, Throwable::printStackTrace);
ここでは、オーバーフローバッファを処理するための戦略として、バッファ内の最も古い要素を削除し、
Observable
によって生成された最新の項目を追加します。
最後の2つの戦略は、要素を削除するときにストリームに不連続性を生じさせることに注意してください。また、
BufferOverflowException
を通知することもありません。
7. すべての過剰生産要素を削除する
ダウンストリームの
Observer
が要素を受け取る準備ができていないときはいつでも、
onBackpressureDrop()
メソッドを使用してその要素をシーケンスから削除できます。
そのメソッドは、
ON
OVERFLOW
DROP
LATEST.
という戦略でバッファの容量をゼロに設定した
onBackpressureBuffer()__メソッドとして考えることができます
この演算子は、後で最新の値が表示されるため、ソースの
Observable
(マウスの移動や現在のGPS位置信号など)からの値を安全に無視できる場合に役立ちます。
Observable.range(1, 1__000__000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.doOnNext(ComputeFunction::compute)
.subscribe(v -> {}, Throwable::printStackTrace);
メソッド
onBackpressureDrop()
は、
Observable
の過剰生成の問題を排除していますが、注意して使用する必要があります。
8結論
この記事では、
オブザーバブル
を過剰生産する問題と、背圧に対処する方法について調べました。
Observer
が
Observable.
によって生成されるのと同じくらい早く要素を消費することができないときの要素のバッファリング、バッチ処理、およびスキップの戦略を調べました。
これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/rxjava[GitHubプロジェクト]で見つけることができます – これはMavenプロジェクトです。そのまま実行します。