ProjectReactorでの例外の処理
1. 概要
このチュートリアルでは、 ProjectReactorで例外を処理するいくつかの方法を見ていきます。 コード例で紹介されている演算子は、MonoクラスとFluxクラスの両方で定義されています。 ただし、ここではFluxクラスのメソッドのみに焦点を当てます。
2. Mavenの依存関係
Reactorコア依存関係を追加することから始めましょう。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId
<version>3.4.9</version>
</dependency>
3. パイプラインオペレーターで直接例外をスローする
例外を処理する最も簡単な方法は、それをスローすることです。 ストリーム要素の処理中に何か異常が発生した場合、通常のメソッド実行であるかのように、throwキーワードを使用して例外をスローできます。
StringからIntegerへのストリームを解析する必要があると仮定します。 要素が数値のStringでない場合は、Exceptionをスローする必要があります。
このような変換には、map演算子を使用するのが一般的な方法です。
Function<String, Integer> mapper = input -> {
if (input.matches("\\D")) {
throw new NumberFormatException();
} else {
return Integer.parseInt(input);
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);
ご覧のとおり、入力要素が無効な場合、演算子はExceptionをスローします。 このようにExceptionをスローすると、 Reactorがそれをキャッチし、ダウンストリームにエラーを通知します。
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
このソリューションは機能しますが、エレガントではありません。 Reactive Streams仕様、ルール2.13 で指定されているように、オペレーターは正常に戻る必要があります。 Reactorは、Exceptionをエラー信号に変換することで私たちを助けてくれました。 しかし、もっとうまくやることができます。
基本的に、リアクティブストリームは、onErrorメソッドに依存して障害状態を示します。 ほとんどの場合、この条件は、Publisherでのエラーメソッドの呼び出しによってトリガーされる必要があります。 このユースケースにExceptionを使用すると、従来のプログラミングに戻ります。
4. handle演算子での例外の処理
map 演算子と同様に、 handle 演算子を使用して、ストリーム内のアイテムを1つずつ処理できます。 違いは、 Reactorがハンドルオペレーターに出力シンクを提供し、より複雑な変換を適用できることです。
前のセクションの例を更新して、handle演算子を使用してみましょう。
BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
if (input.matches("\\D")) {
sink.error(new NumberFormatException());
} else {
sink.next(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);
map 演算子とは異なり、 handle演算子は、要素ごとに1回呼び出される機能的なコンシューマーを受け取ります。 このコンシューマーには、アップストリームからの要素と、ダウンストリームに送信される出力を構築するSynchronousSinkの2つのパラメーターがあります。
入力要素が数値のStringの場合、シンクで next メソッドを呼び出し、入力から変換されたIntegerを提供します。 数値のStringでない場合は、Exceptionオブジェクトを指定してerrorメソッドを呼び出して状況を示します。
エラーメソッドを呼び出すと、アップストリームへのサブスクリプションがキャンセルされ、ダウンストリームでonErrorメソッドが呼び出されることに注意してください。 errorとonErrorのこのようなコラボレーションは、リアクティブストリームでExceptionを処理する標準的な方法です。
出力ストリームを確認してみましょう。
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
5. flatMap演算子での例外の処理
エラー処理をサポートするもう1つの一般的に使用される演算子は、flatMapです。 この演算子は、入力要素を Publisher に変換してから、Publisherを新しいストリームにフラット化します。 これらのPublisherを利用して、誤った状態を示すことができます。
flatMapを使用して同じ例を試してみましょう。
Function<String, Publisher<Integer>> mapper = input -> {
if (input.matches("\\D")) {
return Mono.error(new NumberFormatException());
} else {
return Mono.just(Integer.parseInt(input));
}
};
Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NumberFormatException.class)
.verify();
当然のことながら、結果は以前と同じです。
エラー処理に関するhandleとflatMapの唯一の違いは、handleオペレーターがシンクでerrorメソッドを呼び出すのに対し、flatMapはPublisherで呼び出すことです。
Fluxオブジェクトで表されるストリームを処理している場合は、concatMapを使用してエラーを処理することもできます。 このメソッドはflatMapとほとんど同じように動作しますが、非同期処理をサポートしていません。
6. NullPointerExceptionの回避
このセクションでは、 null 参照の処理について説明します。これにより、Javaで一般的に発生するExceptionであるNullPointerExceptionが発生することがよくあります。 この例外を回避するために、通常、変数を null と比較し、その変数が実際に null である場合は、実行を別の方法に指示します。 リアクティブストリームでも同じことをしたくなります。
Function<String, Integer> mapper = input -> {
if (input == null) {
return 0;
} else {
return Integer.parseInt(input);
}
};
入力値がnullの場合はすでに処理しているため、NullPointerExceptionは発生しないと思われるかもしれません。 ただし、現実には別の話があります。
Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);
StepVerifier.create(outFlux)
.expectNext(1)
.expectError(NullPointerException.class)
.verify();
どうやら、 NullPointerExceptionがダウンストリームでエラーをトリガーしました。これは、nullチェックが機能しなかったことを意味します。
それが起こった理由を理解するには、ReactiveStreamsの仕様に戻る必要があります。 仕様のルール2.13は、「 onSubscribe 、 onNext 、 onError 、または onComplete を呼び出すと、通常どおりに戻る必要があります。指定されたパラメータがnullの場合、java.lang.NullPointerExceptionを呼び出し元にスローする必要があります。
仕様で要求されているように、null値がマップ関数に達すると、ReactorはNullPointerExceptionをスローします。
したがって、特定のストリームに到達したときのnull値については何もできません。 ダウンストリームに渡す前に、それを処理したり、null以外の値に変換したりすることはできません。 したがって、 NullPointerExceptionを回避する唯一の方法は、null値がパイプラインに到達しないようにすることです。
7. 結論
この記事では、ProjectReactorでの例外の処理について説明しました。 いくつかの例について説明し、プロセスを明確にしました。 また、リアクティブストリームの処理時に発生する可能性のある例外の特殊なケースであるNullPointerExceptionについても説明しました。
いつものように、私たちのアプリケーションのソースコードはGitHubでから入手できます。