1. 概要

この記事では、プロジェクトReactorパブリッシャーを組み合わせるさまざまな方法を見ていきます。

2. Mavenの依存関係

プロジェクトReactorの依存関係を使用して例を設定しましょう。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.1.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.1.4.RELEASE</version>
    <scope>test</scope>
</dependency>

3. パブリッシャーの組み合わせ

で作業する必要があるシナリオを考えるとフラックスまた単核症 、ストリームを組み合わせるにはさまざまな方法があります。

での静的メソッドの使用法を説明するために、いくつかの例を作成しましょう。 フラックスなどのクラス concat、concatWith、merge、zip CombineLatest。

この例では、タイプの2つのパブリッシャーを使用しますフラックス 、すなわち偶数 、 これはフラックス整数 1で始まる偶数のシーケンスを保持します(可変)および5で制限( 最大変数)。

oddNumbers と、奇数のIntegerタイプのFluxを作成します。

Flux<Integer> evenNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 == 0); // i.e. 2, 4

Flux<Integer> oddNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 > 0);  // ie. 1, 3, 5

3.1. c oncat()

concat メソッドは、入力の連結を実行し、ソースから放出された要素をダウンストリームに転送します。

連結は、最初のソースを順番にサブスクライブし、それが完了するのを待ってから次のソースをサブスクライブするというように、最後のソースが完了するまで実行されます。 エラーが発生すると、シーケンスはすぐに中断され、ダウンストリームに転送されます。

簡単な例を次に示します。

@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
    Flux<Integer> fluxOfIntegers = Flux.concat(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.2. concatWith()

静的メソッドを使用する concatWith 、タイプの2つのソースの連結を生成しますフラックス結果として:

@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
        
    // same stepVerifier as in the concat example above
}

3.3. c ombineLatest()

Flux静的メソッドcombineLatestは、各パブリッシャーソースから最近公開された値の組み合わせによって提供されるデータを生成します。

次に、2つのPublisherソースとBiFunctionをパラメーターとして使用したこのメソッドの使用例を示します。

@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
    Flux<Integer> fluxOfIntegers = Flux.combineLatest(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(5) // 4 + 1
      .expectNext(7) // 4 + 3
      .expectNext(9) // 4 + 5
      .expectComplete()
      .verify();
}

ここで、関数combineLatestevenNumbers 4 )の最新要素との要素を使用して関数「a+b」を適用したことがわかります。 oddNumbers(1,3,5)、したがってシーケンス5,7,9を生成します。

3.4. merge()

merge 関数は、配列に含まれる Publisher シーケンスからのデータを、インターリーブされたマージシーケンスにマージします。

@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

注意すべき興味深い点は、 concat( lazy Subscription とは対照的に、ソースが熱心にサブスクライブされていることです。

ここで、パブリッシャーの要素間に遅延を挿入すると、merge関数の異なる結果を確認できます。

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.5. mergeSequential()

mergeSequential メソッドは、配列で提供される Publisher シーケンスからのデータを、順序付けられたマージされたシーケンスにマージします。

concat とは異なり、ソースは熱心にサブスクライブされます。

また、 merge とは異なり、それらの出力された値は、サブスクリプション順に最終シーケンスにマージされます。

@Test
public void testMergeSequential() {
    Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
      evenNumbers, 
      oddNumbers);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.6. mergeDelayError()

mergeDelayError は、配列に含まれるパブリッシャーシーケンスからのデータをインターリーブされたマージされたシーケンスにマージします。

concat とは異なり、ソースは熱心にサブスクライブされます。

静的mergeメソッドのこのバリアントは、残りのマージバックログが処理されるまでエラーを遅らせます。

mergeDelayError:の例を次に示します。

@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1, 
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.7. と合併()

静的メソッドmergeWithは、このFluxPublisherからのデータをインターリーブされたマージされたシーケンスにマージします。

繰り返しますが、 concat とは異なり、内部ソースは熱心にサブスクライブされます。

@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
        
    // same StepVerifier as in "3.4. Merge"
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.8. z ip()

静的メソッドzipは、複数のソースを一緒に凝集します。つまり、すべてのソースが1つの要素を放出するのを待ち、これらの要素を出力値に結合します(提供されたコンビネーター関数によって構築されます)。

オペレーターは、ソースのいずれかが完了するまでそうし続けます。

@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
    Flux<Integer> fluxOfIntegers = Flux.zip(
      evenNumbers, 
      oddNumbers, 
      (a, b) -> a + b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(3) // 2 + 1
      .expectNext(7) // 4 + 3
      .expectComplete()
      .verify();
}

evenNumbers からペアリングする要素が残っていないため、oddNumbersパブリッシャーの要素5は無視されます。

3.9. z ipWith()

zipWith は、 zip と同じメソッドを実行しますが、パブリッシャーが2つしかない場合のみです。

@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers
     .zipWith(oddNumbers, (a, b) -> a * b);
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)  // 2 * 1
      .expectNext(12) // 4 * 3
      .expectComplete()
      .verify();
}

4. 結論

このクイックチュートリアルでは、パブリッシャーを組み合わせる複数の方法を発見しました。

いつものように、例はGitHubで利用できます。