1概要

この記事では、https://projectreactor.io/[プロジェクトの反応器]で

Publishers

を組み合わせるさまざまな方法について説明します。


2 Mavenの依存関係

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.1.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.1.4.RELEASE</version>
    <scope>test</scope>
</dependency>


3出版社の結合


Flux <T>

または__Mono <T>を操作する必要があるシナリオでは、ストリームを組み合わせる方法はいくつかあります。


Flux <T>

クラスの静的メソッド(

concat、concatWith、merge、zip



combineLatest.

など)の使い方を説明するための例をいくつか作成しましょう。

この例では

Flux <Integer>

という2つの発行元、つまり

EvenNumbers

を使用します。これは

Integer



Flux

であり、1(

min

variable)から始まり5(

max

variable)までに制限された偶数のシーケンスを持ちます。


oddNumbers

、また奇数の

Integer

型の

Flux

を作成します。

Flux<Integer> evenNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 == 0);//i.e. 2, 4

Flux<Integer> oddNumbers = Flux
  .range(min, max)
  .filter(x -> x % 2 > 0); //ie. 1, 3, 5


3.1. c


_oncat()

_



concat

メソッドは入力の連結を実行し、ソースから発行された要素を下流に転送します。

連結は、最初のソースを順次サブスクライブし、次にそれが完了するのを待ってから次のソースをサブスクライブするという具合に、最後のソースが完了するまで同様に続きます。エラーが発生するとすぐにシーケンスが中断され、ダウンストリームに転送されます。

これは簡単な例です。

@Test
public void givenFluxes__whenConcatIsInvoked__thenConcat() {
    Flux<Integer> fluxOfIntegers = Flux.concat(
      evenNumbers,
      oddNumbers);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}


3.2.



concatWith()


結果として静的メソッド

concatWith

を使用して、タイプ

Flux <T>

の2つのソースの連結を生成します。

@Test
public void givenFluxes__whenConcatWithIsInvoked__thenConcatWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);

   //same stepVerifier as in the concat example above
}


3.3. c


_ombineLatest()

_


Flux静的メソッド

combineLatest

は、各パブリッシャソースから最後に公開された値の組み合わせによって提供されるデータを生成します。

これは、2つの

Publisher

ソースと1つの

BiFunction

をパラメーターとして使用したこのメソッドの使用例です。

@Test
public void givenFluxes__whenCombineLatestIsInvoked__thenCombineLatest() {
    Flux<Integer> fluxOfIntegers = Flux.combineLatest(
      evenNumbers,
      oddNumbers,
      (a, b) -> a + b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(5)//4 + 1
      .expectNext(7)//4 + 3
      .expectNext(9)//4 + 5
      .expectComplete()
      .verify();
}

ここでは、関数

combineLatest

が、最新の

evenNumbers



4

)の要素と

oddNumbers(1,3,5)

の要素を使用して関数 “a b”を適用して、シーケンス

5,7,9

が生成されることがわかります。


3.4.

マージ()



merge

関数は、配列に含まれている

Publisher

シーケンスのデータを、インターリーブされたマージ済みシーケンスにマージします。

@Test
public void givenFluxes__whenMergeIsInvoked__thenMerge() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers,
      oddNumbers);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

注意すべき興味深いことは、

__concat(


lazy subscription




__とは対照的に、ソースは熱心に購読されているということです。

ここで、Publishersの要素間に遅延を挿入すると、

merge

関数の結果が異なることがわかります。

@Test
public void givenFluxes__whenMergeWithDelayedElementsIsInvoked__thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)),
      oddNumbers.delayElements(Duration.ofMillis(300L)));

    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}

3.5.


mergeSequential()



mergeSequential

メソッドは、配列で提供された

Publisher

シーケンスのデータを順序付けられたマージシーケンスにマージします。


concat

とは異なり、情報源は熱心に購読しています。

また、

merge

とは異なり、それらの発行された値は、サブスクリプション順に最終シーケンスにマージされます。

@Test
public void testMergeSequential() {
    Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
      evenNumbers,
      oddNumbers);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}

3.6


mergeDelayError()



mergeDelayError

は、配列に含まれているパブリッシャシーケンスのデータをインターリーブされたマージシーケンスにマージします。


concat

とは異なり、情報源は熱心に購読しています。

この静的な

merge

メソッドの変形は、残りのマージバックログが処理されるまでエラーを遅らせます。

これは__mergeDelayErrorの例です。

@Test
public void givenFluxes__whenMergeWithDelayedElementsIsInvoked__thenMergeWithDelayedElements() {
    Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
      evenNumbers.delayElements(Duration.ofMillis(500L)),
      oddNumbers.delayElements(Duration.ofMillis(300L)));

    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();
}



3.7. と合併()


静的メソッド

mergeWith

は、この

Flux



Publisher

からのデータをインターリーブされたマージ済みシーケンスにマージします。

繰り返しになりますが、

concat

とは異なり、内部の情報源は熱心に購読しています。

@Test
public void givenFluxes__whenMergeWithIsInvoked__thenMergeWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);

   //same StepVerifier as in "3.4. Merge"
    StepVerifier.create(fluxOfIntegers)
      .expectNext(2)
      .expectNext(4)
      .expectNext(1)
      .expectNext(3)
      .expectNext(5)
      .expectComplete()
      .verify();
}


3.8. z


_ip()

_


静的メソッド

zip

は、複数のソースを集約します。つまり、すべてのソースが1つの要素を発行するのを待って、これらの要素を(提供された組み合わせ関数によって構築された)出力値に組み合わせます。

オペレータは、いずれかのソースが完了するまでそうし続けます。

@Test
public void givenFluxes__whenZipIsInvoked__thenZip() {
    Flux<Integer> fluxOfIntegers = Flux.zip(
      evenNumbers,
      oddNumbers,
      (a, b) -> a + b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(3)//2 + 1
      .expectNext(7)//4 + 3
      .expectComplete()
      .verify();
}


evenNumbers

からペアリングする要素が残っていないため、

oddNumbers

Publisherからの要素5は無視されます。


3.9. z


_ipWith()

_



zipWith



zip

と同じメソッドを実行しますが、2人のパブリッシャでのみ実行されます。

@Test
public void givenFluxes__whenZipWithIsInvoked__thenZipWith() {
    Flux<Integer> fluxOfIntegers = evenNumbers
     .zipWith(oddNumbers, (a, b) -> a **  b);

    StepVerifier.create(fluxOfIntegers)
      .expectNext(2) //2 **  1
      .expectNext(12)//4 **  3
      .expectComplete()
      .verify();
}


4結論

このクイックチュートリアルでは、

Publishers.

を組み合わせる複数の方法を発見しました。

いつものように、例はhttps://github.com/eugenp/tutorials/tree/master/reactor-core[GitHubの上に]で利用可能です。