RxJavaでのオブザーバブルのフィルタリング
1. 序章
RxJava の概要に続いて、フィルタリング演算子について説明します。
特に、フィルタリング、スキップ、時間フィルタリング、およびいくつかのより高度なフィルタリング操作に焦点を当てます。
2. フィルタリング
Observable を使用する場合、放出されたアイテムのサブセットのみを選択すると便利な場合があります。 この目的のために、RxJavaはさまざまなフィルタリング機能を提供します。
filterメソッドを見てみましょう。
2.1. filter演算子
簡単に言えば、フィルターオペレーターは、オブザーバブルをフィルター処理して、放出されたアイテムが述語の形式で提供される指定された条件に一致することを確認します。
放出された値から奇数値のみをフィルタリングする方法を見てみましょう。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.filter(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 3, 5, 7, 9);
2.2. take演算子
take でフィルタリングする場合、ロジックにより、残りのアイテムを無視して、最初のnアイテムが発行されます。
source Observable をフィルタリングして、最初の2つのアイテムのみを出力する方法を見てみましょう。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.take(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.3. takeWhile演算子
takeWhile、を使用する場合、フィルター処理された Observable は、Predicateと一致しない最初の要素に遭遇するまでアイテムを放出し続けます。
takeWhile –フィルタリング述語:を使用する方法を見てみましょう。
Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
2.4. takeFirst演算子
特定の条件に一致する最初のアイテムのみを発行する場合は、 takeFirst()。を使用できます。
5より大きい最初のアイテムを発行する方法を簡単に見てみましょう。
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.takeFirst(x -> x > 5);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
2.5. firstおよびfirstOrDefault演算子
同様の動作は、 firstAPIを使用して実現できます。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.first();
filteredObservable.subscribe(subscriber);
subscriber.assertValue(1);
ただし、デフォルト値を指定する場合、アイテムが発行されない場合は、 f irstOrDefaultを使用できます。
Observable<Integer> sourceObservable = Observable.empty();
Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.6. takeLast演算子
次に、Observableによって発行された最後のnアイテムのみを発行する場合は、takeLastを使用できます。
最後の3つのアイテムだけを放出することがどのように可能であるかを見てみましょう:
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.takeLast(3);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(8, 9, 10);
これにより、ソースObservableからのアイテムの放出が完了するまで遅延することを覚えておく必要があります。
2.7. lastおよびlastOrDefault
takeLast(1)を使用する以外に、最後の要素のみを出力する場合は、lastを使用できます。
これにより、 Observable がフィルタリングされ、最後の要素のみが出力されます。これにより、オプションでフィルタリングPredicateが検証されます。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.last(i -> i % 2 != 0);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(9);
Observable が空の場合、 lastOrDefault を使用して、Observableをフィルタリングしてデフォルト値を出力できます。
lastOrDefault 演算子が使用され、フィルタリング条件を検証する項目がない場合にも、デフォルト値が発行されます。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable =
sourceObservable.lastOrDefault(-1, i -> i > 10);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.8. elementAtおよびelementAtOrDefault演算子
elementAt 演算子を使用すると、ソース Observable から発行された単一のアイテムを選択し、そのインデックスを指定できます。
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.elementAt(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(7);
ただし、指定されたインデックスが放出されたアイテムの数を超える場合、elementAtはIndexOutOfBoundExceptionをスローします。
この状況を回避するために、 elementAtOrDefault – を使用することができます。これは、インデックスが範囲外の場合にデフォルト値を返します。
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable
= sourceObservable.elementAtOrDefault(7, -1);
filteredObservable.subscribe(subscriber);
subscriber.assertValue(-1);
2.9. ofType演算子
ObservableがObjectアイテムを発行するときはいつでも、それらのタイプに基づいてそれらをフィルタリングすることが可能です。
放出されたStringタイプのアイテムのみをフィルタリングする方法を見てみましょう。
Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();
Observable filteredObservable = sourceObservable.ofType(String.class);
filteredObservable.subscribe(subscriber);
subscriber.assertValues("two", "five");
3. スキップ
一方、 Observable によって発行されたアイテムの一部を除外またはスキップする場合、 RxJavaは、フィルタリング演算子の対応として、いくつかの演算子を提供します。以前に説明しました。
takeに対応するskip演算子を見てみましょう。
3.1. skip演算子
Observable が一連のアイテムを放出する場合、 skip を使用して、最初に放出されたアイテムの一部を除外またはスキップすることができます。
例えば。 最初の4つの要素をスキップする方法を見てみましょう。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skip(4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(5, 6, 7, 8, 9, 10);
3.2. skipWhile演算子
Observable によって発行され、フィルタリング述語に失敗した最初の値をすべて除外する場合は、skipWhile演算子を使用できます。
Observable<Integer> sourceObservable = Observable
.just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable
.skipWhile(i -> i < 4);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(4, 5, 4, 3, 2, 1);
3.3。 The skipLast オペレーター
skipLast 演算子を使用すると、 Observable によって発行された最後のアイテムをスキップして、それらの前に発行されたアイテムのみを受け入れることができます。
これにより、たとえば、最後の5つの項目をスキップできます。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = sourceObservable.skipLast(5);
filteredObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3, 4, 5);
3.4. distinctおよびdistinctUntilChanged演算子
distinct オペレーターは、 Observable を返します。これは、sourceObservableによって発行された個別のすべてのアイテムを発行します。
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinct();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 3);
ただし、 sourceObservable によって放出され、直前のアイテムとは異なるすべてのアイテムを放出する Observable を取得する場合は、distinctUntilChanged演算子を使用できます。 :
Observable<Integer> sourceObservable = Observable
.just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();
distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 1, 3, 1);
3.5. ignoreElements演算子
sourceObservable によって発行されたすべての要素を無視したい場合はいつでも、 ignoreElements:を使用できます。
Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();
ignoredObservable.subscribe(subscriber);
subscriber.assertNoValues();
4. 時間フィルタリング演算子
観測可能なシーケンスを操作する場合、時間軸は不明ですが、シーケンスからタイムリーなデータを取得すると便利な場合があります。
この目的で、 RxJavaは、時間軸も使用してObservableを操作できるようにするいくつかのメソッドを提供します。
最初のものに移る前に、毎秒アイテムを放出する時限Observableを定義しましょう。
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable = Observable
.just(1, 2, 3, 4, 5, 6)
.zipWith(Observable.interval(
0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);
TestSchedulerは、時計を手動で進めることができる特別なスケジューラーです。
4.1. sampleおよびthrottleLast演算子
sample オペレーターは、 timedObservable をフィルター処理し、期間の時間間隔内にこのAPIによって発行された最新のアイテムを発行するObservableを返します。
timedObservable をサンプリングして、2.5秒ごとに最後に放出されたアイテムのみをフィルタリングする方法を見てみましょう。
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> sampledObservable = timedObservable
.sample(2500L, TimeUnit.MILLISECONDS, testScheduler);
sampledObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(3, 5, 6);
この種の動作は、throttleLast演算子を使用しても実現できます。
4.2. スロットルファーストオペレーター
throttleFirst演算子はthrottleLast/ sample とは異なり、 timedObservable によって放出された最初のアイテムを、最後に放出されたものではなく、各サンプリング期間に放出します。
4秒のサンプリング期間を使用して、最初のアイテムを放出する方法を見てみましょう。
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.throttleFirst(4100L, TimeUnit.SECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 6);
4.3. デバウンスおよびthrottleWithTimeout演算子
debounce 演算子を使用すると、特定の期間が経過した場合に、別のアイテムを発行せずに、アイテムのみを発行することができます。
したがって、放出されたアイテム間の時間間隔よりも大きいタイムスパンを選択した場合、 timedObservable 、最後のものだけを放出します
最初のシナリオで何が起こるか見てみましょう:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValue(6);
この種の動作は、throttleWithTimeoutを使用して実現することもできます。
4.4. タイムアウト演算子
timeout オペレーターは、ソース Observable をミラーリングしますが、ソース Observable が指定された時間内にアイテムの放出に失敗した場合、通知エラーを発行し、アイテムの放出を中止します時間間隔。
timedObservableに500ミリ秒のタイムアウトを指定するとどうなるか見てみましょう。
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.timeout(500L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);
5. 複数の観測可能なフィルタリング
Observable を使用する場合、2番目のObservableに基づいてアイテムをフィルタリングするかスキップするかを決定することは間違いなく可能です。
先に進む前に、 delaydObservable を定義しましょう。これは、3秒後に1つのアイテムのみを放出します。
Observable<Integer> delayedObservable = Observable.just(1)
.delay(3, TimeUnit.SECONDS, testScheduler);
takeUntil演算子から始めましょう。
5.1. takeUntil演算子
takeUntil オペレーターは、2番目の Observable ( delaydObservable )の後、ソース Observable ( timedObservable )によって発行されたアイテムを破棄します。 )アイテムを発行するか、終了します:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.skipUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(4, 5, 6);
5.2. skipUntil演算子
一方、 skipUntil は、ソース Observable ( timedObservable )から放出されたアイテムを2番目の Observable ( delaydObservable )アイテムを放出します:
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable
.takeUntil(delayedObservable);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertValues(1, 2, 3);
6. 結論
この広範なチュートリアルでは、RxJava内で使用可能なさまざまなフィルタリング演算子について説明し、それぞれの簡単な例を示します。
いつものように、この記事のすべてのコード例は、GitHubのにあります。