プロジェクトリアクタでパブリッシャを結合する
1概要
この記事では、https://projectreactor.io/[プロジェクトの反応器]で
Publishers
を組み合わせるさまざまな方法について説明します。
2 Mavenの依存関係
https://search.maven.org/classic/#search%7Cga%7C1%7Cg%3A%22io.projectreactor%22%20AND%20a%3A%22reactor-core%22
[Project Reactorを使って例を設定しましょう依存関係:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>
3出版社の結合
Flux <T>
または__Mono <T>を操作する必要があるシナリオでは、ストリームを組み合わせる方法はいくつかあります。
Flux <T>
クラスの静的メソッド(
concat、concatWith、merge、zip
、
combineLatest.
など)の使い方を説明するための例をいくつか作成しましょう。
この例では
Flux <Integer>
という2つの発行元、つまり
EvenNumbers
を使用します。これは
Integer
の
Flux
であり、1(
min
variable)から始まり5(
max
variable)までに制限された偶数のシーケンスを持ちます。
oddNumbers
、また奇数の
Integer
型の
Flux
を作成します。
Flux<Integer> evenNumbers = Flux
.range(min, max)
.filter(x -> x % 2 == 0);//i.e. 2, 4
Flux<Integer> oddNumbers = Flux
.range(min, max)
.filter(x -> x % 2 > 0); //ie. 1, 3, 5
3.1. c
_oncat()
_
concat
メソッドは入力の連結を実行し、ソースから発行された要素を下流に転送します。
連結は、最初のソースを順次サブスクライブし、次にそれが完了するのを待ってから次のソースをサブスクライブするという具合に、最後のソースが完了するまで同様に続きます。エラーが発生するとすぐにシーケンスが中断され、ダウンストリームに転送されます。
これは簡単な例です。
@Test
public void givenFluxes__whenConcatIsInvoked__thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.2.
concatWith()
結果として静的メソッド
concatWith
を使用して、タイプ
Flux <T>
の2つのソースの連結を生成します。
@Test
public void givenFluxes__whenConcatWithIsInvoked__thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
//same stepVerifier as in the concat example above
}
3.3. c
_ombineLatest()
_
Flux静的メソッド
combineLatest
は、各パブリッシャソースから最後に公開された値の組み合わせによって提供されるデータを生成します。
これは、2つの
Publisher
ソースと1つの
BiFunction
をパラメーターとして使用したこのメソッドの使用例です。
@Test
public void givenFluxes__whenCombineLatestIsInvoked__thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(5)//4 + 1
.expectNext(7)//4 + 3
.expectNext(9)//4 + 5
.expectComplete()
.verify();
}
ここでは、関数
combineLatest
が、最新の
evenNumbers
(
4
)の要素と
oddNumbers(1,3,5)
の要素を使用して関数 “a b”を適用して、シーケンス
5,7,9
が生成されることがわかります。
3.4.
マージ()
merge
関数は、配列に含まれている
Publisher
シーケンスのデータを、インターリーブされたマージ済みシーケンスにマージします。
@Test
public void givenFluxes__whenMergeIsInvoked__thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
注意すべき興味深いことは、
__concat(
lazy subscription
)
__とは対照的に、ソースは熱心に購読されているということです。
ここで、Publishersの要素間に遅延を挿入すると、
merge
関数の結果が異なることがわかります。
@Test
public void givenFluxes__whenMergeWithDelayedElementsIsInvoked__thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.5.
mergeSequential()
mergeSequential
メソッドは、配列で提供された
Publisher
シーケンスのデータを順序付けられたマージシーケンスにマージします。
concat
とは異なり、情報源は熱心に購読しています。
また、
merge
とは異なり、それらの発行された値は、サブスクリプション順に最終シーケンスにマージされます。
@Test
public void testMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.6
mergeDelayError()
mergeDelayError
は、配列に含まれているパブリッシャシーケンスのデータをインターリーブされたマージシーケンスにマージします。
concat
とは異なり、情報源は熱心に購読しています。
この静的な
merge
メソッドの変形は、残りのマージバックログが処理されるまでエラーを遅らせます。
これは__mergeDelayErrorの例です。
@Test
public void givenFluxes__whenMergeWithDelayedElementsIsInvoked__thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.7. と合併()
静的メソッド
mergeWith
は、この
Flux
と
Publisher
からのデータをインターリーブされたマージ済みシーケンスにマージします。
繰り返しになりますが、
concat
とは異なり、内部の情報源は熱心に購読しています。
@Test
public void givenFluxes__whenMergeWithIsInvoked__thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
//same StepVerifier as in "3.4. Merge"
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.8. z
_ip()
_
静的メソッド
zip
は、複数のソースを集約します。つまり、すべてのソースが1つの要素を発行するのを待って、これらの要素を(提供された組み合わせ関数によって構築された)出力値に組み合わせます。
オペレータは、いずれかのソースが完了するまでそうし続けます。
@Test
public void givenFluxes__whenZipIsInvoked__thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(3)//2 + 1
.expectNext(7)//4 + 3
.expectComplete()
.verify();
}
evenNumbers
からペアリングする要素が残っていないため、
oddNumbers
Publisherからの要素5は無視されます。
3.9. z
_ipWith()
_
zipWith
は
zip
と同じメソッドを実行しますが、2人のパブリッシャでのみ実行されます。
@Test
public void givenFluxes__whenZipWithIsInvoked__thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a ** b);
StepVerifier.create(fluxOfIntegers)
.expectNext(2) //2 ** 1
.expectNext(12)//4 ** 3
.expectComplete()
.verify();
}
4結論
このクイックチュートリアルでは、
Publishers.
を組み合わせる複数の方法を発見しました。
いつものように、例はhttps://github.com/eugenp/tutorials/tree/master/reactor-core[GitHubの上に]で利用可能です。