1. 概要

このチュートリアルでは、 ProjectReactorで例外を処理するいくつかの方法を見ていきます。 コード例で紹介されている演算子は、MonoクラスとFluxクラスの両方で定義されています。 ただし、ここではFluxクラスのメソッドのみに焦点を当てます。

2. Mavenの依存関係

Reactorコア依存関係を追加することから始めましょう。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId
    <version>3.4.9</version>
</dependency>

3. パイプラインオペレーターで直接例外をスローする

例外を処理する最も簡単な方法は、それをスローすることです。 ストリーム要素の処理中に何か異常が発生した場合、通常のメソッド実行であるかのように、throwキーワードを使用して例外をスローできます。

StringからIntegerへのストリームを解析する必要があると仮定します。 要素が数値のStringでない場合は、Exceptionをスローする必要があります。

このような変換には、map演算子を使用するのが一般的な方法です。

Function<String, Integer> mapper = input -> {
    if (input.matches("\\D")) {
        throw new NumberFormatException();
    } else {
        return Integer.parseInt(input);
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

ご覧のとおり、入力要素が無効な場合、演算子はExceptionをスローします。 このようにExceptionをスローすると、 Reactorがそれをキャッチし、ダウンストリームにエラーを通知します。

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

このソリューションは機能しますが、エレガントではありません。 Reactive Streams仕様、ルール2.13 で指定されているように、オペレーターは正常に戻る必要があります。 Reactorは、Exceptionをエラー信号に変換することで私たちを助けてくれました。 しかし、もっとうまくやることができます。

基本的に、リアクティブストリームは、onErrorメソッドに依存して障害状態を示します。 ほとんどの場合、この条件は、Publisherでのエラーメソッドの呼び出しによってトリガーされる必要があります。 このユースケースにExceptionを使用すると、従来のプログラミングに戻ります。

4. handle演算子での例外の処理

map 演算子と同様に、 handle 演算子を使用して、ストリーム内のアイテムを1つずつ処理できます。 違いは、 Reactorがハンドルオペレーターに出力シンクを提供し、より複雑な変換を適用できることです。

前のセクションの例を更新して、handle演算子を使用してみましょう。

BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
    if (input.matches("\\D")) {
        sink.error(new NumberFormatException());
    } else {
        sink.next(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

map 演算子とは異なり、 handle演算子は、要素ごとに1回呼び出される機能的なコンシューマーを受け取ります。 このコンシューマーには、アップストリームからの要素と、ダウンストリームに送信される出力を構築するSynchronousSinkの2つのパラメーターがあります。

入力要素が数値のStringの場合、シンクで next メソッドを呼び出し、入力から変換されたIntegerを提供します。 数値のStringでない場合は、Exceptionオブジェクトを指定してerrorメソッドを呼び出して状況を示します。

エラーメソッドを呼び出すと、アップストリームへのサブスクリプションがキャンセルされ、ダウンストリームでonErrorメソッドが呼び出されることに注意してください。 erroronErrorのこのようなコラボレーションは、リアクティブストリームでExceptionを処理する標準的な方法です。

出力ストリームを確認してみましょう。

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

5. flatMap演算子での例外の処理

エラー処理をサポートするもう1つの一般的に使用される演算子は、flatMapです。 この演算子は、入力要素を Publisher に変換してから、Publisherを新しいストリームにフラット化します。 これらのPublisherを利用して、誤った状態を示すことができます。

flatMapを使用して同じ例を試してみましょう。

Function<String, Publisher<Integer>> mapper = input -> {
    if (input.matches("\\D")) {
        return Mono.error(new NumberFormatException());
    } else {
        return Mono.just(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

当然のことながら、結果は以前と同じです。

エラー処理に関するhandleとflatMapの唯一の違いは、handleオペレーターがシンクでerrorメソッドを呼び出すのに対し、flatMapはPublisherで呼び出すことです。

Fluxオブジェクトで表されるストリームを処理している場合は、concatMapを使用してエラーを処理することもできます。 このメソッドはflatMapとほとんど同じように動作しますが、非同期処理をサポートしていません。

6. NullPointerExceptionの回避

このセクションでは、 null 参照の処理について説明します。これにより、Javaで一般的に発生するExceptionであるNullPointerExceptionが発生することがよくあります。 この例外を回避するために、通常、変数を null と比較し、その変数が実際に null である場合は、実行を別の方法に指示します。 リアクティブストリームでも同じことをしたくなります。

Function<String, Integer> mapper = input -> {
    if (input == null) {
        return 0;
    } else {
        return Integer.parseInt(input);
    }
};

入力値がnullの場合はすでに処理しているため、NullPointerExceptionは発生しないと思われるかもしれません。 ただし、現実には別の話があります。

Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NullPointerException.class)
    .verify();

どうやら、 NullPointerExceptionがダウンストリームでエラーをトリガーしました。これは、nullチェックが機能しなかったことを意味します

それが起こった理由を理解するには、ReactiveStreamsの仕様に戻る必要があります。 仕様のルール2.13は、「 onSubscribe onNext onError 、または onComplete を呼び出すと、通常どおりに戻る必要があります。指定されたパラメータがnullの場合、java.lang.NullPointerExceptionを呼び出し元にスローする必要があります。

仕様で要求されているように、null値がマップ関数に達すると、ReactorはNullPointerExceptionをスローします。

したがって、特定のストリームに到達したときのnull値については何もできません。 ダウンストリームに渡す前に、それを処理したり、null以外の値に変換したりすることはできません。 したがって、 NullPointerExceptionを回避する唯一の方法は、null値がパイプラインに到達しないようにすることです

7. 結論

この記事では、ProjectReactorでの例外の処理について説明しました。 いくつかの例について説明し、プロセスを明確にしました。 また、リアクティブストリームの処理時に発生する可能性のある例外の特殊なケースであるNullPointerExceptionについても説明しました。

いつものように、私たちのアプリケーションのソースコードはGitHubから入手できます。