1. 序章

このチュートリアルでは、 SpringWebFluxで記述されたリアクティブプログラムの並行性について説明します。

まず、リアクティブプログラミングに関連する並行性について説明します。 その後、SpringWebFluxがさまざまなリアクティブサーバーライブラリに対して同時実行の抽象化を提供する方法について説明します。

2. リアクティブプログラミングの動機

典型的なWebアプリケーションは、いくつかの複雑で相互作用するパーツで構成されています。 これらの相互作用の多くは本質的にブロックされています。たとえば、データをフェッチまたは更新するためのデータベース呼び出しを伴うものです。 ただし、他のいくつかは独立しており、同時に実行できます。場合によっては並行して実行できます。

たとえば、Webサーバーへの2つのユーザー要求を異なるスレッドで処理できます。 マルチコアプラットフォームでは、これには全体的な応答時間の点で明らかな利点があります。 したがって、この同時実行モデルは、要求ごとのスレッドモデルとして知られています。

上の図では、各スレッドが一度に1つのリクエストを処理します。

スレッドベースの同時実行性は問題の一部を解決しますが、単一スレッド内のほとんどの対話がまだブロックされているという事実に対処することはできません。 さらに、Javaで並行性を実現するために使用するネイティブスレッドには、コンテキストスイッチの点でかなりのコストがかかります。

一方、Webアプリケーションがますます多くの要求に直面するにつれて、要求ごとのスレッドモデルは期待を下回り始めます

したがって、必要なのは、比較的少ないスレッド数でますます多くのリクエストを処理するのに役立つ同時実行モデルです。 これは、リアクティブプログラミングを採用する主な動機の1つです。

3. リアクティブプログラミングにおける並行性

リアクティブプログラミングは、データフローとそれらを介した変更の伝播の観点からプログラムを構造化するのに役立ちます。 したがって、完全に非ブロッキング環境では、これにより、より優れたリソース使用率でより高い同時実行性を実現できます。

しかし、リアクティブプログラミングはスレッドベースの並行性から完全に逸脱していますか? これは強力な主張ですが、リアクティブプログラミングは、並行性を実現するためのスレッドの使用に対して、確かに非常に異なるアプローチを採用しています。 したがって、リアクティブプログラミングがもたらす根本的な違いは非同期性です。

つまり、プログラムフローは、一連の同期操作から非同期のイベントストリームに変換されます。

たとえば、リアクティブモデルでは、データベースへの読み取り呼び出しは、データのフェッチ中に呼び出し元のスレッドをブロックしません。 呼び出しは、他のユーザーがサブスクライブできるパブリッシャーをすぐに返します。 サブスクライバーは、イベントが発生した後にイベントを処理でき、さらにイベント自体を生成できます。

とりわけ、リアクティブプログラミングは、どのスレッドイベントを生成および消費する必要があるかを強調しません。 強調は、むしろ、プログラムを非同期イベントストリームとして構造化することです

ここでのパブリッシャーとサブスクライバーは、同じスレッドの一部である必要はありません。 これにより、使用可能なスレッドの使用率が向上し、全体的な同時実行性が向上します。

4. イベントループ

並行性へのリアクティブなアプローチを説明するいくつかのプログラミングモデルがあります

このセクションでは、それらのいくつかを調べて、リアクティブプログラミングがより少ないスレッドでより高い並行性を実現する方法を理解します。

サーバー用のそのようなリアクティブ非同期プログラミングモデルの1つは、イベントループモデルです。

上記は、イベントループの抽象的な設計であり、リアクティブ非同期プログラミングのアイデアを示しています。

  • イベントループはシングルスレッドで継続的に実行されますが、使用可能なコアの数と同じ数のイベントループを持つことができます
  • イベントループはイベントキューからのイベントを順番に処理し、コールバックプラットフォームに登録するとすぐにに戻ります。
  • プラットフォームは、データベース呼び出しや外部サービス呼び出しなどの操作の完了をトリガーできます
  • イベントループは、操作完了通知でコールバックをトリガーし、結果を元の呼び出し元に送り返すことができます

イベントループモデルは、 Node.js Netty Ngnixなどの多くのプラットフォームに実装されています。 。 これらは、 Apache HTTPサーバー Tomcat 、またはJBossなどの従来のプラットフォームよりもはるかに優れたスケーラビリティを提供します。

5. SpringWebFluxを使用したリアクティブプログラミング

これで、Spring WebFluxの主題を探求するために、リアクティブプログラミングとその並行性モデルについて十分な洞察が得られました。

WebFluxはです SpringリアクティブスタックWebフレームワーク。バージョン5.0で追加されました。

Spring WebFluxのサーバー側スタックを調べて、Springの従来のWebスタックをどのように補完するかを理解しましょう。

ご覧のとおり、 Spring WebFluxは、Springの従来のWebフレームワークと並行して配置されており、必ずしもに置き換わるわけではありません。

ここで注意すべき重要な点がいくつかあります。

  • Spring WebFluxは、機能ルーティングを使用して従来のアノテーションベースのプログラミングモデルを拡張します
  • さらに、基盤となるHTTPランタイムを Reactive Streams API に適合させ、ランタイムを相互運用可能にします
  • したがって、Tomcat、Reactor、Netty、Undertowなどのサーブレット3.1以降のコンテナを含むさまざまなリアクティブランタイムをサポートできます。
  • 最後に、 WebClient が含まれています。これは、機能的で流暢なAPIを提供するHTTPリクエスト用のリアクティブで非ブロッキングのクライアントです。

6. サポートされているランタイムでのスレッドモデル

前に説明したように、リアクティブプログラムは、ほんの数スレッドで動作し、それらを最大限に活用する傾向があります。 ただし、スレッドの数と性質は、選択した実際のReactiveStreamAPIランタイムによって異なります。

明確にするために、 Spring WebFluxは、HttpHandlerによって提供される共通のAPIを介してさまざまなランタイムに適応できます。 このAPIは、Reactor Netty、Servlet 3.1 API、UndertowAPIなどのさまざまなサーバーAPIを抽象化する1つのメソッドのみを使用した単純なコントラクトです。

それらのいくつかに実装されているスレッドモデルを理解しましょう。

NettyはWebFluxアプリケーションのデフォルトサーバーですが、サポートされている他のサーバーに切り替えるための適切な依存関係を宣言するだけです

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

Java仮想マシンで作成されたスレッドをさまざまな方法で監視することは可能ですが、Threadクラス自体からスレッドをプルするだけで非常に簡単です。

Thread.getAllStackTraces()
  .keySet()
  .stream()
  .collect(Collectors.toList());

6.1. Reactor Netty

すでに述べたように、 Reactor Netty は、SpringBootWebFluxスターターのデフォルトの組み込みサーバーです。 Nettyがデフォルトで作成するスレッドを見てみましょう。 したがって、最初は、他の依存関係を追加したり、WebClientを使用したりすることはありません。 したがって、SpringBootスターターを使用して作成されたSpring WebFluxアプリケーションを起動すると、作成されるデフォルトのスレッドがいくつか表示されることが期待できます。

サーバーの通常のスレッドとは別に、Nettyはリクエスト処理のために一連のワーカースレッドを生成することに注意してください。 これらは通常、使用可能なCPUコアを超えません。これはクアッドコアマシンでの出力です。 また、JVM環境に典型的なハウスキーピングスレッドもたくさんありますが、ここでは重要ではありません。

Nettyは、イベントループモデルを使用して、リアクティブな非同期方式で高度にスケーラブルな同時実行性を提供します。 :

ここで、 EventLoopGroupは、継続的に実行されている必要がある1つ以上のEventLoopを管理します。 したがって、使用可能なコアの数よりも多くのEventLoopsを作成することはお勧めしません。

EventLoopGroup は、新しく作成された各ChannelEventLoopをさらに割り当てます。 したがって、チャネルの存続期間中、すべての操作は同じスレッドによって実行されます。

6.2. Apache Tomcat

Spring WebFluxは、 ApacheTomcatなどの従来のサーブレットコンテナでもサポートされています。

WebFluxは、ノンブロッキングI /Oを備えたサーブレット3.1APIに依存しています。 低レベルアダプタの背後でサーブレットAPIを使用しますが、サーブレットAPIを直接使用することはできません。

Tomcatで実行されているWebFluxアプリケーションにどのような種類のスレッドが期待されるかを見てみましょう。

ここに表示されるスレッドの数とタイプは、以前に観察したものとはかなり異なります。

まず、 Tomcatは、より多くのワーカースレッドから開始します。デフォルトは10です。 もちろん、JVMに典型的なハウスキーピングスレッドと、この説明では無視できるCatalinaコンテナも表示されます。

Java NIOを使用したTomcatのアーキテクチャを理解して、上記のスレッドと相関させましょう。

Tomcat 5以降は、コネクタコンポーネントでNIOをサポートします。コネクタコンポーネントは、主にリクエストの受信を担当します

もう1つのTomcatコンポーネントは、コンテナ管理機能を担当するContainerコンポーネントです。

ここで私たちが関心を持っているのは、ConnectorコンポーネントがNIOをサポートするために実装するスレッドモデルです。 これは、 NioEndpoint モジュールの一部として、 Acceptor Poller、、およびWorkerで構成されています。

Tomcatは、Acceptor、Poller、およびWorker用に1つ以上のスレッドを生成し、通常はWorker専用のスレッドプールを使用します。

Tomcatアーキテクチャに関する詳細な説明はこのチュートリアルの範囲を超えていますが、以前に見たスレッドを理解するのに十分な洞察が得られるはずです。

7. WebClientのスレッドモデル

WebClient は、SpringWebFluxの一部であるリアクティブHTTPクライアントです。 エンドツーエンドリアクティブのアプリケーションを作成できるRESTベースの通信が必要な場合はいつでも使用できます。

これまで見てきたように、リアクティブアプリケーションはほんの数スレッドで動作するため、アプリケーションのどの部分でもスレッドをブロックする余地はありません。 したがって、 WebClient は、WebFluxの可能性を実現する上で重要な役割を果たします。

7.1. WebClientを使用する

WebClientの使用も非常に簡単です。 Spring WebFlux の一部であるため、特定の依存関係を含める必要はありません。

Monoを返す単純なRESTエンドポイントを作成しましょう。

@GetMapping("/index")
public Mono<String> getIndex() {
    return Mono.just("Hello World!");
}

次に、 WebClient を使用してこのRESTエンドポイントを呼び出し、データを事後的に消費します。

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .doOnNext(s -> printThreads());

ここでは、前に説明した方法を使用して作成されたスレッドも印刷しています。

7.2. スレッディングモデルを理解する

では、 WebClient の場合、スレッドモデルはどのように機能しますか?

当然のことながら、WebClientはイベントループモデルを使用して同時実行性も実装しています。 もちろん、必要なインフラストラクチャを提供するために、基盤となるランタイムに依存しています。

Reactor NettyでWebClientを実行している場合、Nettyがサーバーに使用するイベントループを共有します。 したがって、この場合、作成されるスレッドに大きな違いは見られない可能性があります。

ただし、 WebClientは、Jettyなどのサーブレット3.1以降のコンテナでもサポートされていますが、その動作方法は異なります

Jetty を実行しているWebFluxアプリケーションで作成されたスレッドを、 WebClient がある場合とない場合で比較すると、いくつかの追加スレッドがあります。

ここで、WebClientイベントループを作成する必要があります。 したがって、このイベントループが作成する固定数の処理スレッドを確認できます。

場合によっては、 クライアントとサーバー用に別々のスレッドプールを使用すると、パフォーマンスが向上する可能性があります。 Nettyのデフォルトの動作ではありませんが、必要に応じてWebClientの専用スレッドプールを宣言することはいつでも可能です。

これがどのように可能になるかについては、後のセクションで説明します。

8. データアクセスライブラリのスレッドモデル

前に見たように、単純なアプリケーションでさえ、通常、接続する必要のあるいくつかの部分で構成されています。

これらの部分の典型的な例には、データベースやメッセージブローカーが含まれます。 それらの多くに接続するための既存のライブラリはまだブロックされていますが、それは急速に変化しています。

接続用のリアクティブライブラリを提供するデータベースがいくつかありますこれらのライブラリの多くはSpringData内で利用できますが、他のライブラリを直接使用することもできます。

これらのライブラリが使用するスレッドモデルは、私たちにとって特に興味深いものです。

8.1. Spring Data MongoDB

Spring Data MongoDB は、 MongoDBReactiveStreamsドライバー上に構築されたMongoDBのリアクティブリポジトリサポートを提供します。 最も注目すべきは、このドライバーがReactive Streams APIを完全に実装して、ノンブロッキングバックプレッシャーで非同期ストリーム処理を提供することです。

Spring BootアプリケーションでMongoDBのリアクティブリポジトリのサポートを設定するのは、依存関係を追加するのと同じくらい簡単です。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

これにより、リポジトリを作成し、それを使用してMongoDBでいくつかの基本的な操作を非ブロッキング方式で実行できるようになります。

public interface PersonRepository extends ReactiveMongoRepository<Person, ObjectId> {
}
.....
personRepository.findAll().doOnComplete(this::printThreads);

では、このアプリケーションをNettyサーバーで実行すると、どのような種類のスレッドが表示されると予想できますか?

当然のことながら、 a Spring Dataリアクティブリポジトリはサーバーで使用可能なものと同じイベントループを使用するため、大きな違いは見られません。

8.2. リアクターカフカ

Springは、リアクティブKafkaの本格的なサポートを構築中です。ただし、Spring以外でも利用できるオプションがあります。

Reactor Kafkaは、Reactorに基づくKafkaのリアクティブAPIです。 Reactor Kafkaを使用すると、機能APIを使用してメッセージを公開および使用でき、ノンブロッキングバックプレッシャも使用できます。

まず、Reactor Kafkaの使用を開始するには、アプリケーションに必要な依存関係を追加する必要があります。

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.3.10</version>
</dependency>

これにより、ブロックされない方法でKafkaへのメッセージを生成できるようになります。

// producerProps: Map of Standard Kafka Producer Configurations
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender =  KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux
  .range(1, 10)
  .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();

同様に、Kafkaからのメッセージも、ブロックしない方法で使用できるはずです。

// consumerProps: Map of Standard Kafka Consumer Configurations
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads)

これは非常にシンプルで自明です。

Kafkaでトピックreactive-testをサブスクライブし、Fluxのメッセージを取得しています。

私たちにとって興味深いのは、作成されるスレッドです

Nettyサーバーに典型的ではないいくつかのスレッドを見ることができます。

これは、Reactor Kafkaが、Kafkaメッセージ処理に排他的に参加する独自のスレッドプールをいくつかのワーカースレッドで管理していることを示しています。 もちろん、Nettyと無視できるJVMに関連する他のスレッドがたくさん表示されます。

Kafkaプロデューサーは、ブローカーにリクエストを送信するために別のネットワークスレッドを使用します。さらに、シングルスレッドプールスケジューラでアプリケーションに応答を配信します。

一方、Kafkaコンシューマーには、コンシューマーグループごとに1つのスレッドがあり、着信メッセージのリッスンをブロックします。 次に、着信メッセージは別のスレッドプールで処理するようにスケジュールされます。

9. WebFluxのスケジュールオプション

これまでのところ、リアクティブプログラミングは、ほんの数スレッドの完全にノンブロッキングの環境で本当に輝いています。 ただし、これは、実際にブロックしている部分がある場合、パフォーマンスが大幅に低下することも意味します。 これは、ブロック操作によってイベントループが完全にフリーズする可能性があるためです。

では、リアクティブプログラミングで長時間実行されるプロセスやブロッキング操作をどのように処理するのでしょうか?

正直なところ、最善の選択肢はそれらを避けることです。 ただし、これが常に可能であるとは限らず、アプリケーションのこれらの部分に専用のスケジューリング戦略が必要になる場合があります

Spring WebFlux は、データフローチェーン間で処理を別のスレッドプールに切り替えるメカニズムを提供します。 これにより、特定のタスクに必要なスケジューリング戦略を正確に制御できます。 もちろん、 WebFlux は、基盤となるリアクティブライブラリで利用可能なスケジューラと呼ばれるスレッドプールの抽象化に基づいてこれを提供できます。

9.1. 原子炉

Reactor では、 Schedulerクラスが実行モデルと、実行が行われる場所を定義します。

スケジューラークラスは、即時シングルエラスティックパラレルなどの多数の実行コンテキストを提供します。

これらは、さまざまなジョブに役立つさまざまなタイプのスレッドプールを提供します。 さらに、既存のExecutorServiceを使用して独自のSchedulerをいつでも作成できます。

スケジューラーはいくつかの実行コンテキストを提供しますが、Reactorは実行コンテキストを切り替えるさまざまな方法も提供します。 それらはメソッドpublishOnsubscribeOnです。

publishOnSchedulerとともにチェーン内のどこでも使用でき、そのSchedulerは後続のすべてのオペレーターに影響します。

subscribeOnSchedulerとともにチェーン内のどこでも使用できますが、これは放出元のコンテキストにのみ影響します。

思い出してください。NettyのWebClientは、デフォルトの動作としてサーバー用に作成されたものと同じイベントループを共有します。 ただし、WebClient専用のスレッドプールを作成する正当な理由がある場合があります。

WebFluxのデフォルトのリアクティブライブラリであるReactorでこれを実現する方法を見てみましょう。

Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .publishOn(scheduler)
  .doOnNext(s -> printThreads());

以前は、WebClientの有無にかかわらずNettyで作成されたスレッドに違いは見られませんでした。 ただし、上記のコードを実行すると、いくつかの新しいスレッドが作成されます

ここでは、境界付きエラスティックスレッドプールの一部として作成されたスレッドを確認できます。 WebClient からの応答は、サブスクライブすると公開される場所です。

これにより、サーバー要求を処理するためのメインスレッドプールが残ります。

9.2. RxJava

RxJavaのデフォルトの動作は、Reactorの動作とそれほど変わりません。

Observable と、それに適用する一連のオペレーターは、サブスクリプションが呼び出されたのと同じスレッドで作業を行い、オブザーバーに通知します。 また、 RxJava は、Reactorと同様に、プレフィックス付きまたはカスタムのスケジューリング戦略をチェーンに導入する方法を提供します。

RxJavaはクラスSchedulersも備えており、Observableチェーンの多数の実行モデルを提供します。 これらには、新しいスレッド即時トランポリン io 計算、およびテストが含まれます。 ]。 もちろん、Java ExecutorからSchedulerを定義することもできます。

さらに、RxJavaは、これを実現するための2つの拡張メソッド、subscribeOnおよびobserveOnも提供します。

subscribeOn メソッドは、Observableが動作する別のSchedulerを指定することにより、デフォルトの動作を変更します。

一方、 observeOn メソッドは、Observableがオブザーバーに通知を送信するために使用できる別のスケジューラーを指定します。

前に説明したように、SpringWebFluxはデフォルトでリアクティブライブラリとしてReactorを使用します。 ただし、Reactive Streams APIと完全に互換性があるため、 RxJava (ReactiveStreamsアダプターを備えたRxJava1.xの場合)などの別のReactiveStreams実装に切り替えることができます。

依存関係を明示的に追加する必要があります。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

次に、RxJava固有のスケジューラーとともに、アプリケーションでObservableのようなRxJavaタイプの使用を開始できます。

io.reactivex.Observable
  .fromIterable(Arrays.asList("Tom", "Sawyer"))
  .map(s -> s.toUpperCase())
  .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
  .doOnComplete(this::printThreads);

その結果、このアプリケーションを実行すると、通常のNettyおよびJVM関連のスレッドとは別に、RxJavaスケジューラに関連するいくつかのスレッドが表示されます。

10. 結論

この記事では、並行性のコンテキストからリアクティブプログラミングの前提を探りました。

従来のプログラミングとリアクティブプログラミングの並行性モデルの違いを観察しました。 これにより、Spring WebFluxの同時実行モデルと、それを実現するためのスレッドモデルの採用を検討することができました。

さらに、さまざまなHTTPランタイムおよびリアクティブライブラリと組み合わせて、WebFluxのスレッドモデルを調査しました。

また、WebClientまたはデータアクセスライブラリを使用した場合のスレッドモデルの違いについても説明しました。

最後に、WebFlux内のリアクティブプログラムでスケジューリング戦略を制御するためのオプションに触れました。

いつものように、この記事のソースコードはGitHubにあります。