RxJavaでのObservableの組み合わせ
1. 序章
このクイックチュートリアルでは、RxJavaでObservablesを組み合わせるさまざまな方法について説明します。
RxJavaを初めて使用する場合は、まずこのイントロチュートリアルを確認してください。
さあ、すぐに飛び込みましょう。
2. オブザーバブル
Observable シーケンス、または単に Observables は、非同期データストリームの表現です。
これらはObserverパターンに基づいており、Observerと呼ばれるオブジェクトがObservableによって発行されたアイテムをサブスクライブします。
Observer は、 Observable が将来放出するものに反応するため、サブスクリプションは非ブロッキングです。 これにより、同時実行が容易になります。
RxJavaでの簡単なデモンストレーションは次のとおりです。
Observable
.from(new String[] { "John", "Doe" })
.subscribe(name -> System.out.println("Hello " + name))
3. オブザーバブルの組み合わせ
リアクティブフレームワークを使用してプログラミングする場合、さまざまなObservablesを組み合わせるのが一般的なユースケースです。
たとえば、Webアプリケーションでは、互いに独立した2セットの非同期データストリームを取得する必要がある場合があります。
次のストリームを要求する前に前のストリームが完了するのを待つ代わりに、両方を同時に呼び出して、結合されたストリームをサブスクライブすることができます。
このセクションでは、RxJavaで複数の Observables を組み合わせることができるいくつかの異なる方法と、各メソッドが適用されるさまざまなユースケースについて説明します。
3.1. マージ
merge 演算子を使用して、複数の Observables の出力を組み合わせて、1つのように機能させることができます。
@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[] {"I love", "RxJava"})
).subscribe(testSubscriber);
testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}
3.2. MergeDelayError
mergeDelayError メソッドは、複数の Observables を1つに結合するという点で、 merge と同じですが、マージ中にエラーが発生した場合、エラーが発生する可能性があります-エラーを伝播する前に続行する無料のアイテム:
@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.mergeDelayError(
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);
testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(RuntimeException.class);
}
上記の例は、すべてのエラーのない値を出力します。
hello
world
rxjava
mergeDelayErrorの代わりにmergeを使用すると、
3.3. Zip
zip 拡張メソッドは、2つの値のシーケンスをペアとしてまとめます:
@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
List<String> zippedStrings = new ArrayList<>();
Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
assertThat(zippedStrings).isNotEmpty();
assertThat(zippedStrings.size()).isEqualTo(3);
assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}
3.4. 間隔のあるZip
この例では、間隔でストリームを圧縮します。これにより、実際には最初のストリームの要素の放出が遅延します。
@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}
4. 概要
この記事では、ObservablesをRxJavaと組み合わせるためのいくつかの方法を見てきました。 combineLatest 、 join 、 groupJoin 、 switchOnNext などの他のメソッドについては、公式RxJavaドキュメントで学ぶことができます。 。
いつものように、この記事のソースコードは、GitHubリポジトリで入手できます。