1前書き


Reactor Core

はリアクティブプログラミングモデルを実装するJava 8ライブラリです。これはリアクティブアプリケーションを構築するための標準であるhttp://www.reactive-streams.org/[Reactive Streams Specification]の上に構築されています。

非反応的Java開発の背景から、反応的になることはかなり急な習熟曲線になる可能性があります。 Java 8

Stream

AP​​Iと比較すると、これらは同じ高レベルの抽象概念であると誤解される可能性があるため、これはより困難になります。

この記事では、このパラダイムをわかりやすく説明します。リアクティブコードの作成方法についての画像を作成し、後のシリーズでさらに高度な記事を作成するための基礎を築くまで、Reactorを介して小さなステップを踏みます。


2リアクティブストリームの仕様

Reactorを見る前に、Reactive Streams Specificationを見てください。これがReactorが実装しているもので、ライブラリの基礎を築きます。

基本的に、Reactive Streamsは非同期ストリーム処理の仕様です。

言い換えれば、たくさんのイベントが非同期的に生成され消費されているシステムです。 1秒あたり何千もの株式の更新が財務アプリケーションに入ってくることを考えてみてください。そして、それがタイムリーにそれらの更新に応答する必要があるためです。

これの主な目的の1つは、背圧の問題に対処することです。イベントを処理できるよりも早くコンシューマにイベントを発行しているプロデューサがあると、最終的にはコンシューマはシステムリソースを使い果たし、イベントに圧倒されることになります。バックプレッシャーとは、これを防ぐために、消費者がプロデューサーに送信するデータ量を伝えることができることを意味します。これが、仕様で規定されている内容です。


3 Mavenの依存関係

始める前に、私たちのhttps://search.maven.org/classic/#search%7Cga%7C1%7C%20(g%3A%22io.projectreactor%22%20AND%20a%3A%22reactor-coreを追加しましょう。 %22)%20OR%20(g%3A%22ch.qos.logback%22%20AND%20a%3A%22logback-classic%22)[Maven]依存関係:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.0.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.3</version>
</dependency>

依存関係としてhttps://logback.qos.ch/[Logback]も追加しています。これは、データの流れをよりよく理解するためにReactorの出力を記録するためです。


4データの流れを生み出す

アプリケーションがリアクティブになるためには、最初にできることはデータのストリームを生成することです。これは、先ほど説明した株式更新の例のようなものです。このデータがなければ、対処するものは何もないでしょう。そのため、これが論理的な最初のステップです。 Reactive Coreは、これを可能にする2つのデータ型を提供します。


4.1. フラックス

これを行う最初の方法は、


Flux

を使用することです。これは、

0..n__個の要素を放出できるストリームです。 。簡単なものを作成してみましょう。

Flux<String> just = Flux.just("1", "2", "3");

この場合、3つの要素からなる静的ストリームがあります。


4.2. モノ

これを行う2番目の方法は__https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html[Mono]を使用することです。インスタンス化してみましょう:

Mono<String> just = Mono.just("foo");

これは

Flux

とほとんど同じように見え、振る舞いますが、今回は1つの要素に制限されています。

** 4.3. なぜフラックスだけじゃないの?

さらに試す前に、なぜこれら2つのデータ型があるのか​​を強調する価値があります。

まず、

Flux



Mono

の両方がReactive Streamsの実装であることに注意してください。http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html[発行元]__インターフェース。どちらのクラスも仕様に準拠しており、代わりにこのインタフェースを使用できます。

Publisher<String> just = Mono.just("foo");

しかし、実際には、この濃度を知ることは有用です。これは、いくつかの操作は2つのタイプのうちの1つのみに意味があり、それがより表現力に富んだものになる可能性があるためです(リポジトリの

findOne()

を想像してください)。


5ストリームを購読する

データのストリームを生成する方法の概要を説明しました。次に、要素を発行するためにデータをサブスクライブする必要があります。


5.1. 要素を集める


subscribe()

メソッドを使用して、ストリーム内のすべての要素を収集しましょう。

List<Integer> elements = new ArrayList<>();

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(elements::add);

assertThat(elements).containsExactly(1, 2, 3, 4);

購読するまでデータは流れ始めません。ロギングも追加したことに注意してください。これは、舞台裏で何が起こっているのかを見るときに役立ちます。


5.2. 要素の流れ

ログインしたら、それを使ってデータがストリームをどのように流れているかを視覚化できます。

20:25:19.550[main]INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable]FluxArray.ArraySubscription)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | onNext(1)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | onNext(2)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | onNext(3)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | onNext(4)
20:25:19.553[main]INFO  reactor.Flux.Array.1 - | onComplete()

まず第一に、すべてがメインスレッドで実行されています。この記事の後半で並行性についてさらに詳しく説明するので、これについては詳しく説明しないでください。ただし、すべてを順番に処理できるため、物事が単純になります。

それでは、1つずつログに記録したシーケンスを見てみましょう。


  1. onSubscribe()

    – ストリームを購読するときに呼び出されます


  2. request(無制限)–

    私たちが

    subscribe

    を呼び出すとき、私たちは舞台裏で

を作成しています


Subscription

.

このサブスクリプションは、ストリームから要素を要求します。この場合、
デフォルトは

unboundedで、

はすべての要素を1つずつ要求することを意味します。
利用可能
。 __onNext() – これは、すべての要素に対して呼び出されます。

  1. __onComplete() – 最後を受け取った後、最後に呼び出されます.

素子。実際には

onError()

もあります。これは例外がある場合に呼び出されますが、この場合はありません。

これは、


Subscriber


Reactive Streams Specificationの一部として記述されているフローです。実際には、これが

onSubscribe()の呼び出しの舞台裏でインスタンス化されたものです。これは便利な方法ですが、何が起こっているのかを理解するために

Subscriber__インターフェースを直接提供しましょう。

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
      s.request(Long.MAX__VALUE);
    }

    @Override
    public void onNext(Integer integer) {
      elements.add(integer);
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

上記のフローで考えられる各段階は、

Subscriber

実装のメソッドに対応しています。

Flux

がこの冗長性を減らすためのヘルパーメソッドを提供してくれたのは偶然です。


5.3. Java 8

Streams


との比較

それでも、Java 8

Stream

がcollectを実行するのと同義の何かがあるように思われるかもしれません。

List<Integer> collected = Stream.of(1, 2, 3, 4)
  .collect(toList());

私たちだけがそうではありません。

主な違いは、Reactiveがプッシュモデルであるのに対し、Java 8

Streams

はプルモデルであることです。

反応的アプローチで。彼らがやってくると、イベントは購読者に「プッシュ」されます。

次に注目すべきことは、

Streams

端末演算子です。これは、端末で、すべてのデータを取得して結果を返すということです。 Reactiveを使えば、アドホックベースで複数の加入者を接続したり削除したりしながら、外部リソースから無限のストリームを受け取ることができます。ストリームの結合、ストリームの絞り込み、背圧の適用などを行うこともできます。これについては次に説明します。


6. 背圧

次に検討すべきことは背圧です。この例では、購読者はすべての要素を一度にプッシュするようプロデューサに指示しています。

これは、加入者にとって圧倒的になり、そのすべてのリソースを消費することになりかねません。

バックプレッシャーは、ダウンストリームが圧倒されないようにするために、ダウンストリームからアップストリームに送信するデータを減らすように指示できる場合です。

バックプレッシャを適用するように

Subscriber

実装を変更できます。


request()

を使用して、一度に2つの要素のみを送信するようにアップストリームに指示しましょう。

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(new Subscriber<Integer>() {
    private Subscription s;
    int onNextAmount;

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        s.request(2);
    }

    @Override
    public void onNext(Integer integer) {
        elements.add(integer);
        onNextAmount++;
        if (onNextAmount % 2 == 0) {
            s.request(2);
        }
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

コードを再度実行すると、

request(2)

が呼び出され、続いて

onNext()

が2回呼び出されてから、再度

request(2)

が呼び出されます。

23:31:15.395[main]INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable]FluxArray.ArraySubscription)
23:31:15.397[main]INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.397[main]INFO  reactor.Flux.Array.1 - | onNext(1)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | onNext(2)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | onNext(3)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | onNext(4)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398[main]INFO  reactor.Flux.Array.1 - | onComplete()

基本的に、これは反応性の引き戻し圧力です。私たちは上流に一定量の要素だけをプッシュするよう要求しています、そして私たちが準備ができているときだけ。 Twitterからツイートをストリーミング配信していると想像した場合、何をすべきかを決定するのはアップストリーム次第です。つぶやきが入ってきたが下流からの要求がない場合、上流はアイテムを落としたり、それらをバッファに格納したり、あるいはその他の戦略をとることができます。


7. ストリームを操作する

必要に応じてイベントに応答して、ストリーム内のデータに対して操作を実行することもできます。


7.1. ストリーム内のデータのマッピング

簡単な操作は、変換を適用することです。この場合は、ストリーム内のすべての数字を2倍にしましょう。

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i **  2)
  .subscribe(elements::add);


onNext()

が呼び出されると、

map()

が適用されます。


7.2. 2つのストリームを組み合わせる

それから別のストリームをこのストリームと組み合わせることで物事をもっと面白くすることができます。

zip()

function


_を使って試してみましょう:

_

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i **  2)
  .zipWith(Flux.range(0, Integer.MAX__VALUE),
    (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
  .subscribe(elements::add);

assertThat(elements).containsExactly(
  "First Flux: 2, Second Flux: 0",
  "First Flux: 4, Second Flux: 1",
  "First Flux: 6, Second Flux: 2",
  "First Flux: 8, Second Flux: 3");

ここでは、1ずつ増加し続け、元のものと一緒にストリーミングする別の

Flux

を作成します。ログを調べることで、これらがどのように連携しているかを見ることができます。

20:04:38.064[main]INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable]FluxArray.ArraySubscription)
20:04:38.065[main]INFO  reactor.Flux.Array.1 - | onNext(1)
20:04:38.066[main]INFO  reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable]FluxRange.RangeSubscription)
20:04:38.066[main]INFO  reactor.Flux.Range.2 - | onNext(0)
20:04:38.067[main]INFO  reactor.Flux.Array.1 - | onNext(2)
20:04:38.067[main]INFO  reactor.Flux.Range.2 - | onNext(1)
20:04:38.067[main]INFO  reactor.Flux.Array.1 - | onNext(3)
20:04:38.067[main]INFO  reactor.Flux.Range.2 - | onNext(2)
20:04:38.067[main]INFO  reactor.Flux.Array.1 - | onNext(4)
20:04:38.067[main]INFO  reactor.Flux.Range.2 - | onNext(3)
20:04:38.067[main]INFO  reactor.Flux.Array.1 - | onComplete()
20:04:38.067[main]INFO  reactor.Flux.Array.1 - | cancel()
20:04:38.067[main]INFO  reactor.Flux.Range.2 - | cancel()


Flux

ごとに1つのサブスクリプションがあることに注意してください。

onNext()呼び出しも交互に行われるので、

zip()__関数を適用すると、ストリーム内の各要素のインデックスは一致します。


8ホットストリーム

現在、主にコールドストリームに焦点を当てています。これらは、扱いやすい静的な固定長のストリームです。リアクティブのより現実的なユースケースは無限に起こるものかもしれません。

たとえば、マウスの動きを連続的に反応させたり、さえずりを送ったりする必要があります。これらのタイプのストリームは、常に実行されており、いつでも購読できるため、データの先頭がなくてもホットストリームと呼ばれます。


8.1.

ConnectableFlux


を作成する

ホットストリームを作成する1つの方法は、コールドストリームを1つに変換することです。

永遠に続く

Flux

を作成して、結果をコンソールに出力しましょう。これは、外部リソースからの無限のデータストリームをシミュレートします。

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .publish();


publish()

を呼び出すことで、


ConnectableFlux

が与えられます。これは、

subscribe()__ won ‘を呼び出すことを意味します。 t複数の購読を追加することを許可して、それが放出し始めるようにします。

publish.subscribe(System.out::println);
publish.subscribe(System.out::println);

このコードを実行しても何も起こりません。

Flux

が発光し始めるのは、__connect()を呼び出すまでです。購読しているかどうかは関係ありません。


8.2. スロットル

コードを実行すると、コンソールにログ記録がいっぱいになります。これは、あまりにも多くのデータが消費者に渡される状況をシミュレートしています。これをスロットルで回避してみましょう。

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .sample(ofSeconds(2))
  .publish();

ここでは、2秒間隔で

sample()

メソッドを導入しました。これで、値は2秒ごとにサブスクライバにプッシュされるだけになります。つまり、コンソールは多忙なものにはなりません。

もちろん、ウィンドウイングやバッファリングなど、ダウンストリームに送信されるデータ量を減らすための複数の戦略がありますが、それらはこの記事の範囲外です。


9並行性

上記の例はすべてメインスレッドで実行されています。

ただし、必要に応じて、コードをどのスレッドで実行するかを制御できます。


Scheduler


インターフェースは、非同期コードを抽象化したもので、そのために多くの実装が用意されています。 mainに別のスレッドを購読してみましょう。

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i **  2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(elements::add);


Parallel

スケジューラは、サブスクリプションを別のスレッドで実行するようにします。これはログを見ることで証明できます。

20:03:27.505[main]DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529[parallel-1]INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable]FluxArray.ArraySubscription)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | onNext(1)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | onNext(2)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | onNext(3)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | onNext(4)
20:03:27.531[parallel-1]INFO  reactor.Flux.Array.1 - | onComplete()

並行性はこれをより面白くします、そしてそれは我々がそれを別の記事で探る価値があるでしょう。


10結論

この記事では、Reactive Coreの概要について説明しました。ストリームのパブリッシュとサブスクライブ、バックプレッシャーの適用、ストリームの操作、およびデータの非同期処理を説明しました。これにより、リアクティブアプリケーションを作成するための基盤を築くことができます。

このシリーズの後半の記事では、より高度な並行処理およびその他の反応的な概念について説明します。リンクをカバーする別の記事もあります:/spring-reactor[Reactor with Spring]。

我々のアプリケーションのソースコードはhttps://github.com/eugenp/tutorials/tree/master/reactor-core[GitHubで利用可能]で入手可能です。これはMavenプロジェクトであり、そのまま実行できます。