1. 序章

Spring WebFlux は、Webアプリケーションにリアクティブプログラミングを提供します。 リアクティブデザインの非同期で非ブロッキングの性質により、パフォーマンスとメモリ使用量が向上します。 Project Reactor は、データストリームを効率的に管理するための機能を提供します。

ただし、これらの種類のアプリケーションでは、背圧が一般的な問題です。 このチュートリアルでは、それが何であるか、およびSpringWebFluxでバックプレッシャメカニズムを適用してそれを軽減する方法について説明します。

2. 反応性ストリームの背圧

リアクティブプログラミングの非ブロッキングの性質により、サーバーは完全なストリームを一度に送信しません。 利用可能になるとすぐにデータを同時にプッシュできます。 したがって、クライアントはイベントを受信して処理するために待機する時間が短くなります。 しかし、克服すべき問題があります。

ソフトウェアシステムのバックプレッシャは、トラフィック通信を過負荷にする機能です。 言い換えれば、情報の発信者は、処理できないデータで消費者を圧倒します。

最終的に、人々はこの用語を、それを制御および処理するためのメカニズムとしても適用します。 これは、下流の力を制御するためにシステムによって実行される保護アクションです。

2.1. 背圧とは何ですか?

リアクティブストリームでは、 背圧は、ストリーム要素の送信を調整する方法も定義します。 つまり、受信者が消費できる要素の数を制御します。

例を使用して、それが何であるかを明確に説明しましょう。

  • システムには、パブリッシャー、コンシューマー、およびグラフィカルユーザーインターフェイス(GUI)の3つのサービスが含まれています。
  • パブリッシャーは1秒あたり10000イベントをコンシューマーに送信します
  • コンシューマーはそれらを処理し、結果をGUIに送信します
  • GUIは結果をユーザーに表示します
  • コンシューマーは1秒あたり7500イベントしか処理できません

この速度では、コンシューマーはイベントを管理できません( バックプレッシャ)。 その結果、システムが崩壊し、ユーザーには結果が表示されなくなります。

2.2. 全身障害を防ぐための背圧の使用

ここでの推奨事項は、システム障害を防ぐために、ある種のバックプレッシャ戦略を適用することです。 目的は、受け取った追加のイベントを効率的に管理することです。

  • 制御送信されるデータストリームが最初のオプションになります。 基本的に、発行者はイベントのペースを遅くする必要があります。 したがって、コンシューマーが過負荷になることはありません。 残念ながら、これは常に可能であるとは限らず、他の利用可能なオプションを見つける必要があります
  • 余分な量のデータをバッファリングすることが2番目の選択肢です。 このアプローチでは、コンシューマーは残りのイベントを処理できるようになるまで一時的に保存します。 ここでの主な欠点は、バッファのバインドを解除してメモリをクラッシュさせることです
  • 余分なイベントをドロップしてそれらを追跡できなくなります。 このソリューションでさえ理想からはほど遠いです。このテクニックを使えば、システムは崩壊しません。

 

2.3. 背圧の制御

パブリッシャーによって発行されたイベントの制御に焦点を当てます。 基本的に、従うべき3つの戦略があります。

  • サブスクライバーが要求した場合にのみ新しいイベントを送信します。 これは、エミッター要求で要素を収集するためのプル戦略です。
  • クライアント側で受信するイベントの数を制限します。 限定的なプッシュ戦略として機能するパブリッシャーは、一度に最大量のアイテムのみをクライアントに送信できます
  • コンシューマーがそれ以上のイベントを処理できない場合のデータストリーミングのキャンセル。 この場合、受信者はいつでも送信を中止し、後で再びストリームにサブスクライブできます。

 

3. SpringWebFluxでの背圧の処理

Spring WebFluxは、リアクティブストリームの非同期ノンブロッキングフローを提供します。 Spring WebFlux内のバックプレッシャの原因は、 ProjectReactorです。 内部でフラックス機能を使用して、エミッターによって生成されるイベントを制御するメカニズムを適用します。

WebFluxは、TCPフロー制御を使用してバックプレッシャをバイト単位で調整します。 ただし、コンシューマーが受け取ることができる論理要素は処理しません。 内部で発生する相互作用の流れを見てみましょう。

  • WebFluxフレームワークは、TCPを介してイベントを転送/受信するために、イベントをバイトに変換する役割を果たします。
  • 次の論理要素を要求する前に、コンシューマーが長時間実行するジョブを開始する場合があります。
  • 受信者がイベントを処理している間、新しいイベントの要求がないため、WebFluxは確認応答なしでバイトをエンキューします
  • TCPプロトコルの性質上、新しいイベントが発生した場合、パブリッシャーはそれらをネットワークに送信し続けます

 

結論として、上の図は、論理要素の需要が消費者と発行者で異なる可能性があることを示しています。 Spring WebFluxは、システム全体として相互作用するサービス間のバックプレッシャーを理想的に管理しません。 それは、消費者と独立して、次に発行者と同じ方法でそれを処理します。 ただし、2つのサービス間の論理的な需要は考慮されていません。

したがって、 Spring WebFluxは、予想どおりの背圧を処理しません。 次のセクションで、SpringWebFluxにバックプレッシャメカニズムを実装する方法を見てみましょう。

4. SpringWebFluxを使用した背圧メカニズムの実装

Flux実装を使用して、受信したイベントの制御を処理します。 したがって、読み取り側と書き込み側でバックプレッシャサポートを使用して要求と応答の本文を公開します。 次に、プロデューサーは、コンシューマーの容量が解放されるまで、速度を落とすか停止します。 それを行う方法を見てみましょう!

4.1. 依存関係

例を実装するには、SpringWebFluxスターターおよびReactorテストの依存関係をpom.xmlに追加するだけです。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

4.2. リクエスト

最初のオプションは消費者が処理できるイベントを制御できるようにすることです。 したがって、パブリッシャーは、レシーバーが新しいイベントを要求するまで待機します。 要約すると、クライアントは Flux にサブスクライブし、その要求に基づいてイベントを処理します。

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
    Flux request = Flux.range(1, 50);

    request.subscribe(
      System.out::println,
      err -> err.printStackTrace(),
      () -> System.out.println("All 50 items have been successfully processed!!!"),
      subscription -> {
          for (int i = 0; i < 5; i++) {
              System.out.println("Requesting the next 10 elements!!!");
              subscription.request(10);
          }
      }
    );

    StepVerifier.create(request)
      .expectSubscription()
      .thenRequest(10)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .thenRequest(10)
      .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
      .thenRequest(10)
      .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
      .thenRequest(10)
      .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
      .thenRequest(10)
      .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
      .verifyComplete();

このアプローチでは、エミッターがレシーバーを圧倒することはありません。 つまり、クライアントは必要なイベントを処理するための制御下にあります。

StepVerifier を使用して、背圧に関するプロデューサーの動作をテストします。 thenRequest(n)が呼び出された場合にのみ、次のn個のアイテムが期待されます。

4.3. 制限

2番目のオプションは、ProjectReactorのlimitRange()演算子を使用することです。 一度にプリフェッチするアイテム数を設定できます。 興味深い機能の1つは、サブスクライバーが処理するイベントをさらに要求した場合でも制限が適用されることです。 エミッターはイベントをチャンクに分割し、各リクエストの制限を超えて消費することを回避します。

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
    Flux<Integer> limit = Flux.range(1, 25);

    limit.limitRate(10);
    limit.subscribe(
      value -> System.out.println(value),
      err -> err.printStackTrace(),
      () -> System.out.println("Finished!!"),
      subscription -> subscription.request(15)
    );

    StepVerifier.create(limit)
      .expectSubscription()
      .thenRequest(15)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .expectNext(11, 12, 13, 14, 15)
      .thenRequest(10)
      .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
      .verifyComplete();
}

4.4. キャンセル

最後に、コンシューマーは、いつでも受信するイベントをキャンセルできます。 この例では、別のアプローチを使用します。 Project Reactorを使用すると、独自の Subscriber を実装したり、BaseSubscriberを拡張したりできます。 それでは、受信者が言及されたクラスをオーバーライドするときにいつでも新しいイベントの受信を中止する方法を見てみましょう。

@Test
public void whenCancel_thenSubscriptionFinished() {
    Flux<Integer> cancel = Flux.range(1, 10).log();

    cancel.subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnNext(Integer value) {
            request(3);
            System.out.println(value);
            cancel();
        }
    });

    StepVerifier.create(cancel)
      .expectNext(1, 2, 3)
      .thenCancel()
      .verify();
}

5. 結論

このチュートリアルでは、リアクティブプログラミングのバックプレッシャとそれを回避する方法を示しました。 Spring WebFluxは、ProjectReactorを介してバックプレッシャをサポートします。 したがって、パブリッシャーがあまりにも多くのイベントでコンシューマーを圧倒した場合に、可用性、堅牢性、および安定性を提供できます。 要約すると、需要が高いことによるシステム障害を防ぐことができます。

いつものように、コードはGitHubから入手できます。