RxJavaでバックプレッシャに対処する
1. 概要
この記事では、RxJavaライブラリがバックプレッシャの処理にどのように役立つかを見ていきます。
簡単に言えば、RxJavaは、1つまたは複数のオブザーバブルがサブスクライブできるオブザーバブルを導入することにより、リアクティブストリームの概念を利用します。 背圧の問題に直面する必要があるため、無限の可能性のあるストリームを処理することは非常に困難です。
Observable が、サブスクライバーが消費できるよりも速くアイテムを放出している状況に陥ることは難しくありません。 未消費アイテムのバッファが増えるという問題のさまざまな解決策を見ていきます。
2. ホットオブザーバブルとコールドオブザーバブル
まず、後で定義するObservablesの要素のコンシューマーとして使用される単純なコンシューマー関数を作成しましょう。
public class ComputeFunction {
public static void compute(Integer v) {
try {
System.out.println("compute integer v: " + v);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
compute()関数は、単に引数を出力しています。 ここで注意すべき重要なことは、 Thread.sleep(1000)メソッドの呼び出しです。これは、Observableがアイテムでいっぱいになる長時間実行タスクをエミュレートするために実行しています。 オブザーバーがそれらを消費できるよりも速く。
オブザーバブルには、ホットとコールドの2種類があり、背圧処理に関してはまったく異なります。
2.1. コールドオブザーバブル
コールドObservableは特定のアイテムのシーケンスを放出しますが、 Observer が便利であると判断したときに、このシーケンスの放出を開始できます。シーケンスの整合性を損なうことなく。 ColdObservableは怠惰な方法でアイテムを提供しています。
Observer は、そのアイテムを処理する準備ができている場合にのみ要素を取得します。アイテムはプル方式で要求されるため、Observableにバッファリングする必要はありません。
たとえば、100万から100万までの要素の静的範囲に基づいて Observable を作成すると、その Observable は、それらのアイテムが観察される頻度に関係なく、同じシーケンスのアイテムを放出します。 :
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute);
プログラムを開始すると、アイテムはオブザーバーによって怠惰に計算され、プル方式で要求されます。 The Schedulers.computation() メソッドは、実行したいことを意味します観察者の計算スレッドプール内
プログラムの出力は、 Observableから1つずつ呼び出されたcompute()メソッドの結果で構成されます。
compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...
コールドオブザーバブルはプル方式で機能するため、いかなる形のバックプレッシャも必要ありません。 コールドObservableによって発行されるアイテムの例には、データベースクエリ、ファイル取得、またはWebリクエストの結果が含まれる場合があります。
2.2. ホットオブザーバブル
ホットなObservableはアイテムの生成を開始し、アイテムが作成されるとすぐにそれらを放出します。 これは、処理のコールドオブザーバブルプルモデルとは対照的です。 Hot Observableは独自のペースでアイテムを放出し、それに追いつくのはオブザーバー次第です。
オブザーバブルがオブザーバブルによって生成されるほど迅速にアイテムを消費できない場合、それらはメモリをいっぱいにするため、バッファリングまたは他の方法で処理する必要があります。最終的にOutOfMemoryException。を引き起こします
ホットなObservable、の例を考えてみましょう。これは、それらのアイテムを処理しているエンドユーザーに100万個のアイテムを生産しています。 Observerのcompute()メソッドがすべてのアイテムの処理に時間がかかると、 Observable がメモリをアイテムでいっぱいにし始め、プログラムが発生します失敗する:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
IntStream.range(1, 1_000_000).forEach(source::onNext);
Observable の過剰生成を処理する方法を定義しなかったため、そのプログラムの実行はMissingBackpressureExceptionで失敗します。
ホットから放出されるアイテムの例観察可能マウスとキーボードのイベント、システムイベント、または株価が含まれる場合があります。
3. バッファリングの過剰生成Observable
過剰生産に対処する最初の方法観察可能で処理できない要素に対して、ある種のバッファを定義することです。
buffer()メソッドを呼び出すことでそれを行うことができます:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.buffer(1024)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
サイズが1024のバッファーを定義すると、オブザーバーが過剰に生成されているソースに追いつくための時間が与えられます。 バッファには、まだ処理されていないアイテムが格納されます。
バッファサイズを増やして、生成される値に十分なスペースを確保できます。
ただし、ソースが予測されたバッファサイズを過剰に生成した場合でもオーバーフローが発生する可能性があるため、通常、これは一時的な修正にすぎないことに注意してください。
4. 放出されたアイテムのバッチ処理
N個の要素のウィンドウで過剰生産されたアイテムをバッチ処理できます。
ObservableがObserverが処理できるよりも速く要素を生成している場合、生成された要素をグループ化し、要素のバッチをObserverに送信することでこれを軽減できます。要素の代わりに要素のコレクションを1つずつ処理するには:
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.window(500)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
引数500でwindow()メソッドを使用すると、はObservableに要素を500サイズのバッチにグループ化するように指示します。 この手法は、オブザーバブルが要素を1つずつ処理する場合に比べて要素のバッチをより高速に処理できる場合に、オブザーバブルの過剰生成の問題を減らすことができます。
5. 要素をスキップする
Observable によって生成された値の一部を安全に無視できる場合は、特定の時間内のサンプリングとスロットル演算子を使用できます。
メソッドsample()およびthrottleFirst()は、パラメーターとして期間を取ります。
- s ample()メソッドは、要素のシーケンスを定期的に調べ、パラメーターとして指定された期間内に生成された最後のアイテムを出力します。
- throttleFirst()メソッドは、パラメーターとして指定された期間の後に生成された最初のアイテムを発行します
期間は、生成された要素のシーケンスから1つの特定の要素が選択されるまでの時間です。 要素をスキップすることで、背圧を処理するための戦略を指定できます。
PublishSubject<Integer> source = PublishSubject.<Integer>create();
source.sample(100, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation())
.subscribe(ComputeFunction::compute, Throwable::printStackTrace);
要素をスキップする戦略はsample()メソッドになるように指定しました。 100ミリ秒の期間のシーケンスのサンプルが必要です。 その要素はオブザーバーに放出されます。
ただし、これらの演算子は、ダウンストリームのオブザーバーによる値の受信率を低下させるだけであるため、MissingBackpressureExceptionが発生する可能性があることに注意してください。
6. 充填Observableバッファーの処理
要素のサンプリングまたはバッチ処理の戦略がバッファのいっぱいになるのに役立たない場合、バッファがいっぱいになったときにケースを処理する戦略を実装する必要があります。
BufferOverflowException。を防ぐために、 onBackpressureBuffer()メソッドを使用する必要があります。
onBackpressureBuffer()メソッドは、 Observable バッファーの容量、バッファーがいっぱいになったときに呼び出されるメソッド、および必要な要素を処理するための戦略の3つの引数を取ります。バッファから破棄されます。 オーバーフローの戦略は、BackpressureOverflowクラスにあります。
バッファがいっぱいになったときに実行できるアクションには、次の4種類があります。
- ON_OVERFLOW_ERROR – これは、バッファーがいっぱいになったときにBufferOverflowExceptionを通知するデフォルトの動作です。
- ON_OVERFLOW_DEFAULT – 現在、ON_OVERFLOW_ERRORと同じです。
- ON_OVERFLOW_DROP_LATEST –オーバーフローが発生した場合、現在の値は単に無視され、ダウンストリームのObserverが要求すると古い値のみが配信されます。
- ON_OVERFLOW_DROP_OLDEST –バッファー内の最も古い要素を削除し、現在の値を追加します
その戦略を指定する方法を見てみましょう。
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> {}, Throwable::printStackTrace);
ここで、オーバーフローバッファを処理するための戦略は、バッファ内の最も古い要素を削除し、Observableによって生成された最新のアイテムを追加することです。
最後の2つの戦略は、要素をドロップアウトするときにストリームの不連続性を引き起こすことに注意してください。 さらに、BufferOverflowExceptionを通知しません。
7. 過剰生産された要素をすべて削除する
ダウンストリームのObserverが要素を受け取る準備ができていないときはいつでも、 onBackpressureDrop()メソッドを使用してその要素をシーケンスから削除できます。
その方法は、 onBackpressureBuffer() 戦略でバッファの容量をゼロに設定したメソッド
この演算子は、ソース Observable からの値(マウスの動きや現在のGPS位置信号など)を安全に無視できる場合に役立ちます。これは、後でより最新の値が得られるためです。
Observable.range(1, 1_000_000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.doOnNext(ComputeFunction::compute)
.subscribe(v -> {}, Throwable::printStackTrace);
メソッドonBackpressureDrop()は、 Observable の過剰生成の問題を排除していますが、注意して使用する必要があります。
8. 結論
この記事では、Observableの過剰生産の問題とバックプレッシャーへの対処方法について説明しました。 Observer が、Observableによって生成されるほど速く要素を消費できない場合の要素のバッファリング、バッチ処理、およびスキップの戦略を検討しました。
これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。