1. 概要

このチュートリアルでは、 ProjectReactormapおよびflatMap演算子を紹介します。 これらはMonoクラスとFluxクラスで定義されており、ストリームの処理時にアイテムを変換します。

次のセクションでは、FluxクラスのmapメソッドとflatMapメソッドに焦点を当てます。 Mono クラスの同じ名前のものは、まったく同じように機能します。

2. Mavenの依存関係

いくつかのコード例を書くには、Reactorコア依存関係が必要です。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.9.RELEASE</version>
</dependency>

3. map演算子

それでは、map演算子の使用方法を見てみましょう。

Flux#map メソッドは、単一の Function パラメーターを想定しています。これは、次のように単純にすることができます。

Function<String, String> mapper = String::toUpperCase;

このマッパーは、文字列を大文字のバージョンに変換します。 Fluxストリームに適用できます。

Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.map(mapper);

指定されたマッパーは、入力ストリームの各アイテムを出力の新しいアイテムに変換し、順序を保持します。

それを証明しましょう:

StepVerifier.create(outFlux)
  .expectNext("BAELDUNG", ".", "COM")
  .expectComplete()
  .verify();

map メソッドが呼び出されたときに、マッパー関数が実行されないことに注意してください。 代わりに、ストリームをサブスクライブするときに実行されます。

4. flatMap演算子

flatMap演算子に移りましょう。

4.1. コード例

map と同様に、flatMap演算子にはFunctionタイプの単一のパラメーターがあります。 ただし、 map で動作する関数とは異なり、 flatMap マッパー関数は、入力アイテムを通常のオブジェクトではなくパブリッシャーに変換します。

次に例を示します。

Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split(""));

この場合、マッパー関数は文字列を大文字に変換してから、個別の文字に分割します。 最後に、関数はそれらの文字から新しいストリームを構築します。

これで、指定されたマッパーをflatMapメソッドに渡すことができます。

Flux<String> inFlux = Flux.just("baeldung", ".", "com");
Flux<String> outFlux = inFlux.flatMap(mapper);

これまで見てきたフラットマッピング操作では、3つの文字列アイテムを使用してアップストリームから3つの新しいストリームが作成されます。 その後、これら3つのストリームの要素が分割され、絡み合って別の新しいストリームを形成します。 この最終ストリームには、3つの入力文字列すべての文字が含まれています。

次に、この新しく形成されたストリームをサブスクライブして、パイプラインをトリガーし、出力を確認できます。

List<String> output = new ArrayList<>();
outFlux.subscribe(output::add);
assertThat(output).containsExactlyInAnyOrder("B", "A", "E", "L", "D", "U", "N", "G", ".", "C", "O", "M");

異なるソースからのアイテムのインターリーブにより、出力でのそれらの順序が入力で表示されるものと異なる場合があることに注意してください。

4.2. パイプライン操作の説明

マッパーを定義し、それを flatMap オペレーターに渡し、ストリームでこのオペレーターを呼び出すことを完了しました。 詳細を掘り下げて、出力の項目が故障している理由を確認するときが来ました。

まず、ストリームがサブスクライブされるまで操作が発生しないことを明確にしましょう。 その場合、パイプラインが実行され、flatMapメソッドに渡されたマッパー関数が呼び出されます。

この時点で、マッパーは入力ストリーム内の要素に対して必要な変換を実行します。 これらの各要素は複数のアイテムに変換され、新しいストリームを作成するために使用されます。 コード例では、式の値 Flux.just(s.toUpperCase().split("")) そのようなストリームを示します。

Publisher インスタンスで表される新しいストリームの準備ができると、flatMapは熱心にサブスクライブします。 オペレーターは、パブリッシャーが終了するのを待たずに次のストリームに移動します。つまり、サブスクリプションは非ブロッキングです。

パイプラインはすべての派生ストリームを同時に処理するため、それらのアイテムはいつでも入ってくる可能性があります。 その結果、元の注文は失われます。 アイテムの順序が重要な場合は、代わりにflatMapSequential演算子の使用を検討してください。

5. mapflatMapの違い

これまで、mapおよびflatMap演算子について説明してきました。 それらの間の大きな違いで締めくくりましょう。

5.1. 1対1対。 1対多

マップ演算子はストリーム要素に1対1の変換を適用しますが、flatMapは1対多を実行します。 この区別は、メソッドのシグネチャを見ると明らかです。

  • フラックス map(関数<? スーパーT、? V>マッパーを拡張します) –マッパーはタイプの単一の値を変換します T タイプの単一の値に V
  • フラックス flatMap(Function <? スーパーT、? Publisherを拡張します<? R >>マッパーを拡張します) –マッパーはタイプの単一の値を変換します T出版社タイプの要素の R

機能面では、ProjectReactorのmapflatMap の違いは、mapflatMapの違いに似ていることがわかります。 ] JavaStreamAPIで。

5.2. 同期vs。 非同期

ReactorCoreライブラリのAPI仕様からの2つの抜粋を次に示します。

  • map :各アイテムに同期機能を適用することにより、このFluxによって放出されるアイテムを変換します
  • flatMap :このFluxによって放出された要素を非同期でPublishersに変換します

マップが同期演算子であることは簡単にわかります。これは、ある値を別の値に変換する方法にすぎません。 このメソッドは、呼び出し元と同じスレッドで実行されます。

もう1つのステートメント– flatMapは非同期です–はそれほど明確ではありません。 実際、要素の Publishers への変換は、同期または非同期のいずれかになります。

サンプルコードでは、 Flux#just メソッドを使用して要素を出力するため、この操作は同期しています。 ただし、リモートサーバーなど、待ち時間が長いソースを処理する場合は、非同期処理の方が適しています

重要な点は、パイプラインは要素がどのスレッドからのものであるかを気にしないということです。それは単にパブリッシャー自身に注意を払うだけです。

6. 結論

この記事では、プロジェクトReactorのmapおよびflatMap演算子について説明しました。 いくつかの例について説明し、プロセスを明確にしました。

いつものように、私たちのアプリケーションのソースコードはGitHubから入手できます。