1. 概要

このチュートリアルでは、プロジェクトReactorの基本を使用して、フラックスを作成するためのいくつかのテクニックを学びます。

2. Mavenの依存関係

いくつかの依存関係から始めましょう。 reactor-コアreactor-テストが必要です。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

3. 同期エミッション

Flux を作成する最も簡単な方法は、 Flux# generateです。 このメソッドは、ジェネレーター関数を使用して一連のアイテムを生成します。

しかし、最初に、generateメソッドを示すメソッドを保持するクラスを定義しましょう。

public class SequenceGenerator {
    // methods that will follow
}

3.1. 新しい状態のジェネレータ

Reactorを使用してフィボナッチ数列を生成する方法を見てみましょう。

public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}

このgenerateメソッドが引数としてCallableBiFunctionの2つの関数をとることを理解するのは難しいことではありません。

  • Callable 関数は、ジェネレーターの初期状態を設定します。この場合、要素0および1を持つTuplesです。
  • BiFuntion 関数はジェネレーターであり、 SynchronousSink を消費し、シンクのnextメソッドと現在の状態を使用して各ラウンドでアイテムを発行します。

その名前が示すように、SynchronousSinkオブジェクトは同期的に機能します。 ただし、ジェネレーターの呼び出しごとにこのオブジェクトの次のメソッドを複数回呼び出すことはできないことに注意してください。

StepVerifierを使用して生成されたシーケンスを確認しましょう。

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}

この例では、サブスクライバーは5つのアイテムのみを要求するため、生成されたシーケンスは番号3で終了します。

ご覧のとおり、ジェネレーターは、次のパスで使用される新しい状態オブジェクトを返します。 ただし、そうする必要はありません。代わりに、ジェネレーターのすべての呼び出しに状態インスタンスを再利用できます。

3.2. 可変状態のジェネレータ

リサイクルされた状態でフィボナッチ数列を生成するとします。 このユースケースを示すために、最初にクラスを定義しましょう。

public class FibonacciState {
    private int former;
    private int latter;

    // constructor, getters and setters
}

このクラスのインスタンスを使用して、ジェネレーターの状態を保持します。 このインスタンスの2つのプロパティformerlatterは、シーケンス内に2つの連続した番号を格納します。

最初の例を変更すると、generateで可変状態を使用するようになります。

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}

前の例と同様に、この generateバリアントには、状態のサプライヤーとジェネレーターのパラメーターがあります。

タイプCallableの状態サプライヤは、0および1の初期プロパティを持つFibonacciStateオブジェクトを作成するだけです。 この状態オブジェクトは、ジェネレーターのライフサイクル全体で再利用されます。

Fibonacci-with- Tuplesの例のSynchronousSinkと同様に、ここのシンクはアイテムを1つずつ生成します。 ただし、その例とは異なり、ジェネレーターは呼び出されるたびに同じ状態オブジェクトを返します。

また、今回は、無限のシーケンスを回避するために、生成された値が制限に達したときに完了するようにシンクに指示します。

そして、それが機能することを確認するために、もう一度簡単なテストを行いましょう。

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();

    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

3.3. ステートレスバリアント

The 生成メソッドは持っています別のバリアントタイプのパラメータが1つだけ消費者 。 このバリアントは、事前に決定されたシーケンスを生成する場合にのみ適しているため、それほど強力ではありません。 それでは、それについては詳しく説明しません。

4. 非同期エミッション

フラックスをプログラムで作成するための解決策は、同期放出だけではありません。

代わりに、作成演算子とプッシュ演算子を使用して、非同期の方法で放出のラウンドで複数のアイテムを生成できます。

4.1. createメソッド

createメソッドを使用すると、複数のスレッドからアイテムを作成できます。 この例では、2つの異なるソースから要素を1つのシーケンスに収集します。

まず、creategenerateと少し異なることを見てみましょう。

public class SequenceCreator {
    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

generate 演算子とは異なり、 createメソッドは状態を維持しません。そして、このメソッドに渡されるエミッターは、外部ソースから要素を受け取ります。 。

また、create演算子がSynchronousSinkではなくFluxSinkを要求していることがわかります。 FluxSink を使用すると、必要な回数だけnext()を呼び出すことができます。

この例では、アイテムのリストにあるすべてのアイテムに対して next()を呼び出し、それぞれを1つずつ発行します。 アイテムにデータを入力する方法をすぐに説明します。

この場合の外部ソースは架空のconsumerフィールドですが、これは代わりに監視可能なAPIである可能性があります。

create 演算子を実行して、2つの数字のシーケンスから始めましょう。

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

    // other statements described below
}

これらのシーケンスsequence1およびsequence2は、生成されたシーケンスのアイテムのソースとして機能します。

次に、パブリッシャーに要素を注入する2つのThreadオブジェクトがあります。

SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence2)
);

accept 演算子が呼び出されると、要素がシーケンスソースに流れ始めます。

次に、新しい統合されたシーケンスをリッスンするか、サブスクライブできます。

List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);

シーケンスをサブスクライブすることにより、シーケンスによって発行された各アイテムで何が発生するかを示します。 ここでは、異なるソースからの各アイテムを統合リストに追加します。

ここで、アイテムが2つの異なるスレッド上を移動するのを確認するプロセス全体をトリガーします。

producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();

いつものように、最後のステップは操作の結果を確認することです:

assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);

受信したシーケンスの最初の3つの番号は、 sequence1 からのものであり、最後の4つはsequence2からのものです。 非同期操作の性質上、これらのシーケンスの要素の順序は保証されません。

create メソッドには別のバリアントがあり、タイプOverflowStrategyの引数を取ります。 その名前が示すように、この引数は、ダウンストリームがパブリッシャーに追いつけない場合のバックプレッシャーを管理します。 デフォルトでは、このような場合、パブリッシャーはすべての要素をバッファリングします。

4.2. プッシュメソッド

create 演算子に加えて、 Flux クラスには、シーケンスを非同期的に発行する別の静的メソッド、つまりpushがあります。 このメソッドは、 create と同じように機能しますが、一度に1つの生成スレッドのみが信号を送信できる点が異なります。

先ほど行った例のcreateメソッドをpushに置き換えることができますが、コードは引き続きコンパイルされます。

ただし、 push 演算子により、 FluxSink#next が異なるスレッドで同時に呼び出されないため、アサーションエラーが発生することがあります。 結果として、複数のスレッドを使用する予定がない場合にのみプッシュを使用する必要があります。

5. シーケンスの処理

これまで見てきたすべてのメソッドは静的であり、特定のソースからシーケンスを作成できます。 Flux APIは、パブリッシャーによって生成されたシーケンスを処理するための、handleという名前のインスタンスメソッドも提供します。

このhandle演算子はシーケンスを実行し、いくつかの処理を実行し、場合によってはいくつかの要素を削除します。 この点で、ハンドル演算子はマップやフィルターのように機能すると言えます。

handleメソッドの簡単な図を見てみましょう。

public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}

この例では、 handle 演算子は一連の数値を取り、値が偶数の場合は2で除算します。 値が奇数の場合、演算子は何もしません。つまり、そのような数値は無視されます。

もう1つの注意点は、 generate メソッドと同様に、 handleはSynchronousSinkを採用し、1つずつエミッションのみを有効にすることです。

そして最後に、物事をテストする必要があります。 最後にもう一度StepVerifierを使用して、ハンドラーが機能することを確認しましょう。

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}

フィボナッチ数列の最初の10項目には、 0 2 8 、および 34 の4つの偶数があります。したがって、 expectedNextメソッドに渡す引数。

6. 結論

この記事では、プログラムでシーケンスを生成するために使用できる Flux APIのさまざまなメソッド、特にgenerateおよびcreate演算子について説明しました。 。

このチュートリアルのソースコードは、GitHubから入手できます。 これはMavenプロジェクトであり、そのまま実行できるはずです。