1. 概要

この記事では、Java9リアクティブストリームについて説明します。 簡単に言えば、 Flow クラスを使用できるようになります。このクラスは、リアクティブストリーム処理ロジックを構築するための主要なビルディングブロックを囲みます。

Reactive Streams は、ノンブロッキングバックプレッシャを使用した非同期ストリーム処理の標準です。 この仕様は、 反応性マニフェスト、 たとえば、さまざまな実装があります。 RxJava また Akka-Streams。

2. リアクティブAPIの概要

Flow を構築するために、3つの主要な抽象化を使用して、それらを非同期処理ロジックに構成できます。

すべてのフローは、パブリッシャーインスタンスによってフローに公開されたイベントを処理する必要があります ; the 出版社 1つの方法があります– 申し込む()。

サブスクライバーのいずれかがそれによって発行されたイベントを受信したい場合は、指定されたパブリッシャーにサブスクライブする必要があります。

メッセージの受信者はサブスクライバーインターフェイスを実装する必要があります。インスタンスはそれ以上メッセージを送信しないため、通常、これはすべてのフロー処理の終わりです。

私たちは考えることができますサブスクライバーとしてシンク。 これには、オーバーライドする必要のある4つのメソッドがあります– onSubscribe()、onNext()、onError()、 と onComplete()。 次のセクションでそれらを見ていきます。

着信メッセージを変換して次のサブスクライバーにさらに渡す場合は、プロセッサーインターフェイスを実装する必要があります。これは、メッセージを受信するためサブスクライバーとしても、 パブリッシャーは、これらのメッセージを処理し、さらに処理するために送信するためです。

3. メッセージの公開と消費

単純なフロー、を作成し、パブリッシャーがメッセージを発行し、メッセージが到着するとサブスクライバーがメッセージを消費するとします。時間。

EndSubscriberクラスを作成しましょう。 サブスクライバーインターフェースを実装する必要があります。 次に、必要なメソッドをオーバーライドします。

onSubscribe()メソッドは、処理が開始される前に呼び出されます。 サブスクリプションのインスタンスが引数として渡されます。 これは、サブスクライバーパブリッシャー:の間のメッセージのフローを制御するために使用されるクラスです。

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

また、テストで使用されるusedElementsの空のListを初期化しました。

次に、サブスクライバーインターフェースから残りのメソッドを実装する必要があります。 ここでの主なメソッドはonNext()です。これは、Publisherが新しいメッセージを公開するたびに呼び出されます。

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    consumedElements.add(item);
    subscription.request(1);
}

onSubscribe()メソッドでサブスクリプションを開始したとき、およびメッセージを処理したときに、サブスクリプション request()メソッドを呼び出す必要があることに注意してください。現在のサブスクライバーがより多くのメッセージを消費する準備ができていることを通知します。

最後に、 onError()を実装する必要があります。これは、処理中に例外がスローされるたびに呼び出されます。また、 onComplete()– は、 Publisher[ X188X]は閉じています:

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Done");
}

処理のテストを書いてみましょうフロー。 を使用します SubmissionPublisher クラス–からの構成 java .util.concurrent –これは出版社インターフェース。

N要素をPublisherに送信します。これは、EndSubscriberが受信します。

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

と呼んでいることに注意してください近い() のインスタンスのメソッド EndSubscriber。 呼び出す onComplete() すべての下にコールバックサブスクライバー与えられたの出版社。

そのプログラムを実行すると、次の出力が生成されます。

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4. メッセージの変換

パブリッシャーサブスクライバーの間に同様のロジックを構築したいが、いくつかの変換を適用したいとします。

Processorを実装してSubmissionPublisher–を拡張するTransformProcessor クラスを作成します。これは、 P ublisherの両方になります。 ]およびSubscriber。

入力を出力に変換する関数を渡します。

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {

    private Function<T, R> function;
    private Flow.Subscription subscription;

    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
}

ここで、クイックテストを、パブリッシャー文字列要素を公開する処理フローで作成しましょう。

TransformProcessor は、StringIntegerとして解析します。つまり、ここで変換を行う必要があります。

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

ベースPublisherclose()メソッドを呼び出すと、 TransformProcessoronComplete()メソッドが呼び出されることに注意してください。 。

処理チェーン内のすべてのパブリッシャーは、この方法で閉じる必要があることに注意してください。

5. サブスクリプションを使用したメッセージの需要の制御

サブスクリプションの最初の要素のみを消費し、ロジックを適用して処理を終了するとします。 これを実現するには、 request()メソッドを使用できます。

EndSubscriber を変更して、N個のメッセージのみを消費するようにしましょう。 その番号をhowMuchMessagesConsumeコンストラクター引数として渡します。

public class EndSubscriber<T> implements Subscriber<T> {
 
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
    //...
    
}

必要な限り要素をリクエストできます。

指定されたサブスクリプションから1つの要素のみを消費するテストを作成しましょう:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

パブリッシャーは6つの要素を公開していますが、 EndSubscriber は1つの要素のみを処理する要求を通知するため、1つの要素のみを消費します。

サブスクリプションでrequest()メソッドを使用することにより、は、メッセージ消費の速度を制御するためのより高度なバックプレッシャメカニズムを実装できます。

6. 結論

この記事では、Java 9ReactiveStreamsについて説明しました。

処理の作成方法を見ましたフローからなる出版社とサブスクライバー。 を使用して要素を変換することで、より複雑な処理フローを作成しましたプロセッサー

最後に、 サブスクリプションによって要素の需要を制御するサブスクライバー。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。