ProjectReactorでパブリッシャーを組み合わせる
1. 概要
この記事では、プロジェクトReactorでパブリッシャーを組み合わせるさまざまな方法を見ていきます。
2. Mavenの依存関係
プロジェクトReactorの依存関係を使用して例を設定しましょう。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>
3. パブリッシャーの組み合わせ
で作業する必要があるシナリオを考えるとフラックス
での静的メソッドの使用法を説明するために、いくつかの例を作成しましょう。 フラックス
この例では、タイプの2つのパブリッシャーを使用しますフラックス
oddNumbers と、奇数のIntegerタイプのFluxを作成します。
Flux<Integer> evenNumbers = Flux
.range(min, max)
.filter(x -> x % 2 == 0); // i.e. 2, 4
Flux<Integer> oddNumbers = Flux
.range(min, max)
.filter(x -> x % 2 > 0); // ie. 1, 3, 5
3.1. c oncat()
concat メソッドは、入力の連結を実行し、ソースから放出された要素をダウンストリームに転送します。
連結は、最初のソースを順番にサブスクライブし、それが完了するのを待ってから次のソースをサブスクライブするというように、最後のソースが完了するまで実行されます。 エラーが発生すると、シーケンスはすぐに中断され、ダウンストリームに転送されます。
簡単な例を次に示します。
@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.2. concatWith()
静的メソッドを使用する concatWith 、タイプの2つのソースの連結を生成しますフラックス
@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
// same stepVerifier as in the concat example above
}
3.3. c ombineLatest()
Flux静的メソッドcombineLatestは、各パブリッシャーソースから最近公開された値の組み合わせによって提供されるデータを生成します。
次に、2つのPublisherソースとBiFunctionをパラメーターとして使用したこのメソッドの使用例を示します。
@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(5) // 4 + 1
.expectNext(7) // 4 + 3
.expectNext(9) // 4 + 5
.expectComplete()
.verify();
}
ここで、関数combineLatestがevenNumbers( 4 )の最新要素との要素を使用して関数「a+b」を適用したことがわかります。 oddNumbers(1,3,5)、したがってシーケンス5,7,9を生成します。
3.4. merge()
merge 関数は、配列に含まれる Publisher シーケンスからのデータを、インターリーブされたマージシーケンスにマージします。
@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
注意すべき興味深い点は、 concat( lazy Subscription )とは対照的に、ソースが熱心にサブスクライブされていることです。
ここで、パブリッシャーの要素間に遅延を挿入すると、merge関数の異なる結果を確認できます。
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.5. mergeSequential()
mergeSequential メソッドは、配列で提供される Publisher シーケンスからのデータを、順序付けられたマージされたシーケンスにマージします。
concat とは異なり、ソースは熱心にサブスクライブされます。
また、 merge とは異なり、それらの出力された値は、サブスクリプション順に最終シーケンスにマージされます。
@Test
public void testMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.6. mergeDelayError()
mergeDelayError は、配列に含まれるパブリッシャーシーケンスからのデータをインターリーブされたマージされたシーケンスにマージします。
concat とは異なり、ソースは熱心にサブスクライブされます。
静的mergeメソッドのこのバリアントは、残りのマージバックログが処理されるまでエラーを遅らせます。
mergeDelayError:の例を次に示します。
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.7. と合併()
静的メソッドmergeWithは、このFluxとPublisherからのデータをインターリーブされたマージされたシーケンスにマージします。
繰り返しますが、 concat とは異なり、内部ソースは熱心にサブスクライブされます。
@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
// same StepVerifier as in "3.4. Merge"
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.8. z ip()
静的メソッドzipは、複数のソースを一緒に凝集します。つまり、すべてのソースが1つの要素を放出するのを待ち、これらの要素を出力値に結合します(提供されたコンビネーター関数によって構築されます)。
オペレーターは、ソースのいずれかが完了するまでそうし続けます。
@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(3) // 2 + 1
.expectNext(7) // 4 + 3
.expectComplete()
.verify();
}
evenNumbers からペアリングする要素が残っていないため、oddNumbersパブリッシャーの要素5は無視されます。
3.9. z ipWith()
zipWith は、 zip と同じメソッドを実行しますが、パブリッシャーが2つしかない場合のみです。
@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a * b);
StepVerifier.create(fluxOfIntegers)
.expectNext(2) // 2 * 1
.expectNext(12) // 4 * 3
.expectComplete()
.verify();
}
4. 結論
このクイックチュートリアルでは、パブリッシャーを組み合わせる複数の方法を発見しました。
いつものように、例はGitHubので利用できます。