1. 序章
Reactor Core は、リアクティブプログラミングモデルを実装するJava8ライブラリです。 これは、リアクティブアプリケーションを構築するための標準である ReactiveStreams仕様に基づいて構築されています。
非リアクティブJava開発の背景から、リアクティブになることは非常に急な学習曲線になる可能性があります。 これは、Java 8 Stream APIと比較すると、同じ高レベルの抽象化であると誤解される可能性があるため、より困難になります。
この記事では、このパラダイムをわかりやすく説明します。 リアクティブコードを作成する方法の図を作成し、後のシリーズで登場するより高度な記事の基礎を築くまで、Reactorを少しずつ進めていきます。
2. リアクティブストリームの仕様
Reactorを見る前に、ReactiveStreamsの仕様を見る必要があります。 これはReactorが実装するものであり、ライブラリの基礎を築きます。
基本的に、ReactiveStreamsは非同期ストリーム処理の仕様です。
つまり、多くのイベントが非同期で生成および消費されるシステムです。 金融アプリケーションに入る毎秒数千の株式更新のストリームについて考えてみてください。そして、それらの更新にタイムリーに応答する必要があります。
これの主な目標の1つは、背圧の問題に対処することです。 イベントを処理できるよりも速くコンシューマーにイベントを発行しているプロデューサーがいる場合、最終的にはコンシューマーはイベントに圧倒され、システムリソースが不足します。
背圧とは、これを防ぐために消費者が送信するデータの量をプロデューサーに伝えることができる必要があることを意味します。これが仕様に記載されています。
3. Mavenの依存関係
始める前に、Mavenの依存関係を追加しましょう。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.16</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>
また、依存関係としてLogbackを追加しています。 これは、データの流れをよりよく理解するために、Reactorの出力をログに記録するためです。
4. データストリームの生成
アプリケーションがリアクティブであるためには、アプリケーションが最初に実行できる必要があるのは、データのストリームを生成することです。
これは、前に示した株式更新の例のようなものである可能性があります。 このデータがなければ、反応するものは何もありません。これが論理的な最初のステップである理由です。
Reactive Coreは、これを可能にする2つのデータ型を提供します。
4.1. フラックス
これを行う最初の方法はフラックス。 放出できるストリームです 0..n 要素。 簡単なものを作成してみましょう:
Flux<Integer> just = Flux.just(1, 2, 3, 4);
この場合、4つの要素の静的ストリームがあります。
4.2. 単核症
これを行う2番目の方法は、 Mono、を使用することです。これは、0..1要素のストリームです。 1つをインスタンス化してみましょう:
Mono<Integer> just = Mono.just(1);
これは、 Flux とほぼ同じように見え、動作しますが、今回は1つの要素のみに制限されています。
4.3. なぜフラックスだけではないのですか?
さらに実験する前に、これら2つのデータ型がある理由を強調する価値があります。
まず、FluxとMonoの両方がReactiveStreamsPublisherインターフェースの実装であることに注意してください。 どちらのクラスも仕様に準拠しており、代わりにこのインターフェースを使用できます。
Publisher<String> just = Mono.just("foo");
しかし、実際には、このカーディナリティを知ることは有用です。 これは、いくつかの操作が2つのタイプのいずれかに対してのみ意味があり、より表現力が高いためです(リポジトリ内の findOne()を想像してください)。
5. ストリームの購読
これで、データストリームを生成する方法の概要がわかりました。要素を出力するには、データストリームをサブスクライブする必要があります。
5.1. 要素の収集
subscribe()メソッドを使用して、ストリーム内のすべての要素を収集してみましょう。
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);
assertThat(elements).containsExactly(1, 2, 3, 4);
サブスクライブするまで、データは流れ始めません。 ロギングも追加したことに注意してください。これは、舞台裏で何が起こっているかを確認するときに役立ちます。
5.2. 要素の流れ
ロギングが適切に行われると、それを使用して、データがストリームをどのように流れているかを視覚化できます。
20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()
まず、すべてがメインスレッドで実行されています。 この記事の後半で並行性についてさらに検討するため、これについては詳しく説明しません。 ただし、すべてを順番に処理できるため、作業は簡単になります。
次に、ログに記録したシーケンスを1つずつ見ていきましょう。
- onSubscribe() –これはストリームをサブスクライブするときに呼び出されます
- リクエスト(無制限)– 電話するとき申し込む 、舞台裏で私たちは作成していますサブスクリプション。 このサブスクリプションは、ストリームから要素を要求します。 この場合、デフォルトで unboundedになります。つまり、は、使用可能なすべての要素を要求することを意味します。
- onNext()–これはすべての要素で呼び出されます
- onComplete()– これは、最後の要素を受け取った後、最後に呼び出されます。 実際にはonError()もあり、例外がある場合に呼び出されますが、この場合はありません。
これは、 サブスクライバー Reactive Streams仕様の一部としてのインターフェース、そして実際には、それが私たちの呼び出しの舞台裏でインスタンス化されたものです onSubscribe()。 これは便利な方法ですが、何が起こっているのかをよりよく理解するために、 サブスクライバー直接インターフェース:
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
上記のフローの可能な各ステージは、サブスクライバー実装のメソッドにマップされていることがわかります。 Flux が、この冗長性を減らすためのヘルパーメソッドを提供してくれたのはたまたまです。
5.3. Java8ストリームとの比較
それでも、収集を実行しているJava 8 Streamと同義の何かがあるように見える場合があります。
List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());
私たちだけがしません。
主な違いは、Reactiveがプッシュモデルであるのに対し、Java 8 Streamsはプルモデルであるということです。 リアクティブアプローチでは、イベントはサブスクライバーが着信するとプッシュされます。
次に注意するのは、 Streams ターミナル演算子は、ターミナルであり、すべてのデータをプルして結果を返すことです。 Reactiveを使用すると、アドホックベースで複数のサブスクライバーを接続および削除して、外部リソースから無限のストリームを受信できます。 また、ストリームの結合、ストリームの抑制、背圧の適用などを行うこともできます。これについては次に説明します。
6. 背圧
次に考慮すべきことは背圧です。 この例では、サブスクライバーはプロデューサーにすべての要素を一度にプッシュするように指示しています。 これは、サブスクライバーにとって圧倒的になり、すべてのリソースを消費する可能性があります。
バックプレッシャとは、ダウンストリームがアップストリームに送信するデータを減らして、データが圧倒されるのを防ぐように指示できる場合です。
サブスクライバーの実装を変更して、バックプレッシャを適用できます。 request()を使用して、一度に2つの要素のみを送信するようにアップストリームに指示しましょう。
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
コードを再度実行すると、 request(2)が呼び出され、続いて2つの onNext()呼び出し、 request(2)が呼び出されます。 ]また。
23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()
基本的に、これはリアクティブプルバックプレッシャです。 準備ができたときにのみ、特定の量の要素のみをプッシュするようにアップストリームに要求しています。
Twitterからツイートがストリーミングされていると想像すると、何をすべきかを決めるのはアップストリーム次第です。 ツイートが届いていて、ダウンストリームからのリクエストがない場合、アップストリームはアイテムをドロップしたり、バッファに保存したり、その他の戦略を実行したりする可能性があります。
7. ストリームでの操作
また、ストリーム内のデータに対して操作を実行し、適切と思われるイベントに応答することもできます。
7.1. ストリーム内のデータのマッピング
実行できる簡単な操作は、変換を適用することです。 この場合、ストリーム内のすべての数値を2倍にします。
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
map()は、 onNext()が呼び出されたときに適用されます。
7.2. 2つのストリームを組み合わせる
次に、別のストリームをこのストリームと組み合わせることで、物事をより面白くすることができます。 zip() function :を使用してこれを試してみましょう
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
ここでは、別の Flux を作成して、1つずつインクリメントし続け、元のFluxと一緒にストリーミングします。 ログを調べることで、これらがどのように連携するかを確認できます。
20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()
Fluxごとに1つのサブスクリプションがあることに注意してください。 onNext()呼び出しも交互に行われるため、 zip()関数を適用すると、ストリーム内の各要素のインデックスが一致します。
8. ホットストリーム
現在、私たちは主にコールドストリームに焦点を当てています。 これらは静的な固定長のストリームであり、扱いが簡単です。 リアクティブのより現実的なユースケースは、無限に発生するものかもしれません。
たとえば、常に反応する必要のあるマウスの動きやTwitterフィードのストリームを作成できます。 これらのタイプのストリームは、常に実行されており、いつでもサブスクライブでき、データの開始が欠落しているため、ホットストリームと呼ばれます。
8.1. ConnectableFluxの作成
ホットストリームを作成する1つの方法は、コールドストリームを1つに変換することです。 永遠に続くFluxを作成して、結果をコンソールに出力します。これにより、外部リソースからのデータの無限のストリームがシミュレートされます。
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();
電話で公開() 私たちは与えられます ConnectableFlux。 これは、 申し込む() 放出を開始せず、複数のサブスクリプションを追加できるようにします。
publish.subscribe(System.out::println);
publish.subscribe(System.out::println);
このコードを実行しようとしても、何も起こりません。 connect()、を呼び出すまで、Fluxは放出を開始しません。
publish.connect();
8.2. スロットル
コードを実行すると、コンソールはロギングに圧倒されます。 これは、消費者に渡されるデータが多すぎる状況をシミュレートしています。 スロットルでこれを回避してみましょう:
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();
ここでは、2秒間隔の sample()メソッドを紹介しました。 これで、値は2秒ごとにサブスクライバーにプッシュされるだけになります。つまり、コンソールの負荷が大幅に軽減されます。
もちろん、ウィンドウ処理やバッファリングなど、ダウンストリームに送信されるデータの量を減らすための複数の戦略がありますが、これらはこの記事の範囲外になります。
9. 並行性
上記の例はすべて、現在メインスレッドで実行されています。 ただし、必要に応じて、コードを実行するスレッドを制御できます。 Scheduler インターフェースは、非同期コードを抽象化したものであり、多くの実装が提供されています。 mainとは別のスレッドにサブスクライブしてみましょう。
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
Parallel スケジューラーを使用すると、サブスクリプションが別のスレッドで実行されます。これは、ログを確認することで証明できます。 最初のエントリはmainスレッドからのものであり、Fluxはparallel-1と呼ばれる別のスレッドで実行されています。
20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()
並行性はこれよりも興味深いものであり、別の記事で検討する価値があります。
10. 結論
この記事では、ReactiveCoreの概要をエンドツーエンドで説明しました。 ストリームのパブリッシュとサブスクライブ、バックプレッシャの適用、ストリームの操作、およびデータの非同期処理の方法について説明しました。 これにより、リアクティブアプリケーションを作成するための基礎が築かれるはずです。
このシリーズの後半の記事では、より高度な並行性とその他の事後対応の概念について説明します。 Reactor withSpringをカバーする別の記事もあります。
このアプリケーションのソースコードは、GitHubでから入手できます。 これは、そのまま実行できるはずのMavenプロジェクトです。