1. 序章

このクイックチュートリアルでは、RxJavaでObservablesを組み合わせるさまざまな方法について説明します。

RxJavaを初めて使用する場合は、まずこのイントロチュートリアルを確認してください。

さあ、すぐに飛び込みましょう。

2. オブザーバブル

Observable シーケンス、または単に Observables は、非同期データストリームの表現です。

これらはObserverパターンに基づいており、Observerと呼ばれるオブジェクトがObservableによって発行されたアイテムをサブスクライブします。

Observer は、 Observable が将来放出するものに反応するため、サブスクリプションは非ブロッキングです。 これにより、同時実行が容易になります。

RxJavaでの簡単なデモンストレーションは次のとおりです。

Observable
  .from(new String[] { "John", "Doe" })
  .subscribe(name -> System.out.println("Hello " + name))

3. オブザーバブルの組み合わせ

リアクティブフレームワークを使用してプログラミングする場合、さまざまなObservablesを組み合わせるのが一般的なユースケースです。

たとえば、Webアプリケーションでは、互いに独立した2セットの非同期データストリームを取得する必要がある場合があります。

次のストリームを要求する前に前のストリームが完了するのを待つ代わりに、両方を同時に呼び出して、結合されたストリームをサブスクライブすることができます。

このセクションでは、RxJavaで複数の Observables を組み合わせることができるいくつかの異なる方法と、各メソッドが適用されるさまざまなユースケースについて説明します。

3.1. マージ

merge 演算子を使用して、複数の Observables の出力を組み合わせて、1つのように機能させることができます。

@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();

    Observable.merge(
      Observable.from(new String[] {"Hello", "World"}),
      Observable.from(new String[] {"I love", "RxJava"})
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}

3.2. MergeDelayError

mergeDelayError メソッドは、複数の Observables を1つに結合するという点で、 merge と同じですが、マージ中にエラーが発生した場合、エラーが発生する可能性があります-エラーを伝播する前に続行する無料のアイテム

@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable.mergeDelayError(
      Observable.from(new String[] { "hello", "world" }),
      Observable.error(new RuntimeException("Some exception")),
      Observable.from(new String[] { "rxjava" })
    ).subscribe(testSubscriber);

    testSubscriber.assertValues("hello", "world", "rxjava");
    testSubscriber.assertError(RuntimeException.class);
}

上記の例は、すべてのエラーのない値を出力します。

hello
world
rxjava

mergeDelayErrorの代わりにmergeを使用すると、 merge [のため、 String rxjava”は出力されないことに注意してください。 X144X]は、エラーが発生すると、Observablesからのデータの流れを即座に停止します。

3.3. Zip

zip 拡張メソッドは、2つの値のシーケンスをペアとしてまとめます

@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
    List<String> zippedStrings = new ArrayList<>();

    Observable.zip(
      Observable.from(new String[] { "Simple", "Moderate", "Complex" }), 
      Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
      (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
        
    assertThat(zippedStrings).isNotEmpty();
    assertThat(zippedStrings.size()).isEqualTo(3);
    assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}

3.4. 間隔のあるZip

この例では、間隔でストリームを圧縮します。これにより、実際には最初のストリームの要素の放出が遅延します。

@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
        
    Observable
      .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
      .toBlocking().subscribe(testSubscriber);
        
    testSubscriber.assertCompleted();
    testSubscriber.assertValueCount(5);
    testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}

4. 概要

この記事では、ObservablesをRxJavaと組み合わせるためのいくつかの方法を見てきました。 combineLatest join groupJoin switchOnNext などの他のメソッドについては、公式RxJavaドキュメントで学ぶことができます。 。

いつものように、この記事のソースコードは、GitHubリポジトリで入手できます。