Project Reactorを使用したプログラムによるシーケンスの作成

1. 概要

このチュートリアルでは、https://www.baeldung.com/reactor-core [Project Reactor basics]を使用して、https://projectreactor.io/docs/core/release/api/reactorを作成するためのいくつかのテクニックを学習します。 /core/publisher/Flux.html[__Flux__es]。

2. Mavenの依存関係

いくつかの依存関係から始めましょう。 __https://search.maven.org/search?q = g:io.projectreactor%20AND%20a:reactor-core&core = gav [reactor-core] __and https://search.maven.org/searchが必要です。 ?q = g:io.projectreactor%20AND%20a:reactor-test [_reactor-test_]:
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

3. 同期放出

_Flux_を作成する最も簡単な方法は、_Flux#generate_です。 *このメソッドは、アイテムのシーケンスを生成するジェネレーター関数に依存しています。*
しかし、最初に、_generate_メソッドを示すメソッドを保持するクラスを定義しましょう。
public class SequenceGenerator {
    // methods that will follow
}

* 3.1。 新しい状態のジェネレーター*

Reactorでhttps://en.wikipedia.org/wiki/Fibonacci_number[Fibonacci sequence]を生成する方法を見てみましょう。
public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}
これを見るのは難しくありませんhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#generate-java.util.concurrent.Callable-java.util.function.BiFunction- [_generate_]メソッドは、2つの関数を引数として取ります-_Callable_と_BiFunction_:
  • Callable_関数は、ジェネレーターの初期状態を設定します–
    この場合、それはhttps://projectreactor.io/docs/core/release/api/reactor/util/function/Tuples.html[_Tuples
    ]と要素_0_および_1_です。

  • BiFuntion_関数はジェネレーターであり、
    _SynchronousSink
    、各ラウンドでシンクの_next_メソッドと現在の状態でアイテムを発行

    名前が示すように、_SynchronousSink_オブジェクトは同期的に動作します。 ただし、*このオブジェクトの_next_メソッドをジェネレーターの呼び出しごとに複数回呼び出すことはできません。*
    生成されたシーケンスをlink:/reactive-streams-step-verifier-test-publisher[_StepVerifier_]で確認しましょう:
@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}
この例では、サブスクライバーは5つのアイテムのみを要求するため、生成されたシーケンスは番号_3_で終了します。
ご覧のように、* generatorジェネレータは次のパスで使用される新しい状態*オブジェクトを返します。 *ただし、そうする必要はありません。*代わりに、ジェネレータのすべての呼び出しに状態インスタンスを再利用できます。

* 3.2。 可変状態のジェネレーター*

リサイクル状態でフィボナッチ数列を生成するとします。 この使用例を示すために、最初にクラスを定義しましょう:
public class FibonacciState {
    private int former;
    private int latter;

    // constructor, getters and setters
}
このクラスのインスタンスを使用して、ジェネレーターの状態を保持します。 このインスタンスの2つのプロパティ_former_および_latter_は、シーケンス内の2つの連続した番号を格納します。
最初の例を変更する場合、_generate_で可変状態を使用します。
public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}
前の例と同様に、このhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#generate-java.util.concurrent.Callable-java.util.function.BiFunction -java.util.function.Consumer-[_ generate_ variant]には状態サプライヤーとジェネレーターのパラメーターがあります。
タイプ_Callable_の状態サプライヤーは、_0_および_1_の初期プロパティを持つ_FibonacciState_オブジェクトを作成するだけです。 この状態オブジェクトは、ジェネレーターのライフサイクルを通して再利用されます。
Fibonacci-with-_Tuples_の例の_SynchronousSink_と同様に、ここのシンクはアイテムを1つずつ生成します。 ただし、その例とは異なり、*ジェネレーターは呼び出されるたびに同じ状態オブジェクトを返します*。
また、今回は*無限のシーケンスを避けるために*、生成された値が制限に達したときに完了するようにシンクに指示します。
そして、もう一度簡単なテストを行って、機能することを確認します。
@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();

    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

* 3.3。 ステートレスバリアント*

_generate_メソッドには、https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#generate-java.util.function.Consumer- [別のバリアント]があり、型のパラメーターは1つだけです。 _Consumer <SynchronousSink> _。 このバリアントは、事前に決定されたシーケンスの生成にのみ適しているため、それほど強力ではありません。 そのため、詳細については説明しません。

4. 非同期放出

_Flux_をプログラムで作成するための解決策は、同期放出だけではありません。
代わりに、_create_および_push_演算子を使用して、複数のアイテムを非同期の方法で放出ラウンドで生成できます。*

* 4.1。 _create_メソッド*

  • createメソッドを使用して、以下からアイテムを生成できます。複数のスレッド。 **この例では、2つの異なるソースから要素をシーケンスに収集します。

    最初に、_create_と_generate_の違いを見てみましょう。
public class SequenceCreator {
    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}
_generate_演算子とは異なり、* _ create_メソッドは状態を維持しません。*そして、このメソッドに渡されたエミッターは、それ自体でアイテムを生成するのではなく、*外部ソースから要素を受け取ります。*
また、_create_演算子が、_SynchronousSink_の代わりにhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.html[_FluxSink_]を要求することがわかります。 _FluxSink_を使用すると、必要な回数だけ__next()next__を呼び出すことができます。*
この場合、_items_のリストにあるすべてのアイテムに対して_next()_を呼び出し、それぞれを1つずつ発行します。 __items ___に値を設定する方法については、すぐに説明します。
この場合、外部ソースは架空の_consumer_フィールドですが、これは代わりに監視可能なAPIである可能性があります。
_create_演算子を実行して、2つの数字のシーケンスから始めましょう。
@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

    // other statements described below
}
これらのシーケンス_sequence1_および_sequence2_は、生成されたシーケンスのアイテムのソースとして機能します。
次に、パブリッシャーに要素を注ぐ2つの_Thread_オブジェクトがあります。
SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence2)
);
_accept_演算子が呼び出されると、要素がシーケンスソースに流れ始めます。
そして、新しい統合されたシーケンスをリッスン、または_subscribe_できます。
List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);
シーケンスにサブスクライブすることにより、シーケンスによって放出される各アイテムで何が起こるべきかを示します。 ここでは、異なるソースの各アイテムを統合リストに追加します。
ここで、アイテムが2つの異なるスレッドで移動するプロセス全体をトリガーします。
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
いつものように、最後の手順は操作の結果を確認することです。
assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
受信したシーケンスの最初の3つの数字は_sequence1_からのもので、最後の4つの数字は_sequence2_からのものです。 非同期操作の性質により、これらのシーケンスの要素の順序は保証されません。
_create_メソッドにはhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#create-java.util.function.Consumer-reactor.core.publisher.FluxSink.OverflowStrategy- [別のバリアント]、タイプ_https://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.OverflowStrategy.html [OverflowStrategy] _の引数を取る。 その名前が示すように、この引数は、ダウンストリームがパブリッシャーに追い付かない場合にバックプレッシャーを管理します。 *デフォルトでは、パブリッシャーはそのような場合にすべての要素をバッファリングします。*

* 4.2。 _push_メソッド*

_create_演算子に加えて、_Flux_クラスには、シーケンスを非同期的に発行する別の静的メソッド、つまりhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#push-javaがあります.util.function.Consumer-[_ push_]。 このメソッドは、_create_と同じように機能しますが、*一度に1つの生成スレッドのみがシグナルを発行できることを除きます。*
先ほどの例の_create_メソッドを_push_に置き換えても、コードはコンパイルされます
ただし、_push_ operatorが_FluxSink#next_が異なるスレッドで同時に呼び出されるのを防ぐため、アサーションエラーが表示されることがあります。 結果として、*複数のスレッドを使用するつもりがない場合にのみ、_push_を使用する必要があります。*

5. シーケンスの処理

これまで見てきたすべてのメソッドは静的であり、特定のソースからシーケンスを作成できます。 * _Flux_ APIは、_https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#handle-java.util.function.BiConsumer- [handle]という名前のインスタンスメソッドも提供します。 _、発行者が作成したシーケンスを処理します。*
この_handle_演算子はシーケンスを取り、いくつかの処理を行い、場合によってはいくつかの要素を削除します。 この点について、* _ handle_演算子は_map_および_filter_と同じように機能します*。
_handle_メソッドの簡単な図を見てみましょう。
public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}
この例では、_handle_演算子は数値のシーケンスを取り、値が偶数であれば_2_で除算します。 値が奇数の場合、演算子は何もしません。つまり、そのような数値は無視されます。
注意すべきもう1つの点は、_generate_メソッドと同様に、* _ handle_は_SynchronousSink_を使用し、1つずつのエミッションのみを有効にします。*
そして最後に、物事をテストする必要があります。 最後にもう一度_StepVerifier_を使用して、ハンドラーが機能することを確認します。
@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}
フィボナッチ数列の最初の10項目には、_0 _、_ 2 _、_ 8_、および_34_の4つの偶数があります。したがって、_expectNext_メソッドに渡す引数です。

6. 結論

この記事では、プログラムを使用してシーケンスを生成するために使用できる_Flux_ APIのさまざまなメソッド、特に_generate_および_create_演算子について説明しました。
このチュートリアルのソースコードは、https://github.com/eugenp/tutorials/tree/master/reactor-core [GitHub上]で入手できます。 これはMavenプロジェクトであり、そのまま実行できるはずです。