1.概要

この記事では、Java 9 Reactive Streamsに注目します。簡単に言うと、リアクティブストリーム処理ロジックを構築するための主要な構成要素を含む

Flow

クラスを使用することができます。


Reactive Streams

は、ノンブロッキングバックプレッシャを使用した非同期ストリーム処理の標準です。この仕様は、


Reactive Manifesto



に定義されており、さまざまな実装があります。たとえば、

RxJava



Akka-Streams.


2リアクティブAPIの概要


Flow

を構築するために、3つの主要な抽象化を使い、それらを非同期処理ロジックに組み立てることができます。

  • すべての

    Flow

    は、Publisherインスタンスによってそれに発行されたイベントを処理する必要があります** 。

    Publisher

    には、1つのメソッド –

    subscribe().

    があります。

それによって発行されたイベントを受信したい購読者がいる場合は、指定された

Publisher.

に購読する必要があります。

  • メッセージの受信者は

    Subscriber


    インターフェースを実装する必要があります** 通常、これはインスタンスがそれ以上メッセージを送信しないため、すべての

    Flow__処理の終わりです。


Subscriber



Sink.

と考えることができます。これには、オーバーライドする必要がある4つのメソッド、

onSubscribe()、onNext()、onError()、

、および__onComplete()があります。 。

  • 受信メッセージを変換してさらに次の

    Subscriberに渡したい場合は、

    Processor

    インタフェースを実装する必要があります。** メッセージを受信するので

    Subscriber

    として、またメッセージを処理して送信するので

    Publisher__として機能しますさらなる処理のために。

** 3メッセージのパブリッシュと消費

単純な

Flowを作成したいとしましょう

、そこには

Publisher

パブリッシングメッセージと、到着したときに単純な

Subscriber

消費メッセージがあります。


EndSubscriber

クラスを作成しましょう。

Subscriber

インターフェースを実装する必要があります。次に、必要なメソッドをオーバーライドします。

処理が始まる前に

onSubscribe()

メソッドが呼び出されます。

Subscription

のインスタンスが引数として渡されます。

Subscriber

と__Publisherの間のメッセージの流れを制御するために使用されるクラスです。

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

テストで使用する空の

List

of

consumedElements

も初期化しました。

それでは、残りのメソッドを

Subscriber

インターフェースから実装する必要があります。ここでの主なメソッドはonNext()です。これは

Publisher

が新しいメッセージを発行するたびに呼び出されます。

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    subscription.request(1);
}


onSubscribe()

メソッドでサブスクリプションを開始したとき、およびメッセージを処理したときに、

Subscription



request()

メソッドを呼び出して、現在の

Subscriber

がより多くのメッセージを消費する準備ができていることを通知する必要があります。

最後に、

onError()

を実装する必要があります。これは、処理で何らかの例外がスローされるときにはいつでも呼び出されます。

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}

@Override
public void onComplete() {
    System.out.println("Done");
}

Processing __Flowのテストを書きましょう。


N

個の要素を

Publisher

に送信します。これを

EndSubscriber

が受け取ります。

@Test
public void whenSubscribeToIt__thenShouldConsumeAll()
  throws InterruptedException {

   //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");

   //when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

   //then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}


EndSubscriberのインスタンスで

close()

メソッドを呼び出していることに注意してください。指定された

Publisher

のすべての

Subscriber

で、その下の

onComplete()__コールバックが呼び出されます。

そのプログラムを実行すると、次のような出力が生成されます。

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done


4メッセージの変換


Publisher



Subscriber

の間で同様のロジックを構築したいが、変換も適用したいとしましょう。


Processor

を実装し、

SubmissionPublisherを拡張した

TransformProcessor__クラスを作成します。

入力を出力に変換する

Function

を渡します。

public class TransformProcessor<T, R>
  extends SubmissionPublisher<R>
  implements Flow.Processor<T, R> {

    private Function<T, R> function;
    private Flow.Subscription subscription;

    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        close();
    }
}

それでは、

Publisher



String

要素を公開している処理フローで

簡単なテストを書いてみましょう


TransformProcessor

は、

String



Integer

として構文解析します。つまり、変換はここで行われる必要があります。

@Test
public void whenSubscribeAndTransformElements__thenShouldConsumeAll()
  throws InterruptedException {

   //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);

   //when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();

   //then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() ->
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

ベース

Publisher



close()

メソッドを呼び出すと、

TransformProcessor



onComplete()

メソッドが呼び出されることに注意してください。

処理チェーン内のすべてのパブリッシャーはこのようにしてクローズする必要があることに留意してください。


5

Subscription


を使用してメッセージの需要を制御する

Subscriptionの最初の要素だけを使い、ロジックを適用して処理を終了したいとしましょう。これを実現するために

request()

メソッドを使用できます。

N個のメッセージのみを消費するように

EndSubscriber

を変更しましょう。

その数を

howMuchMessagesConsume

コンストラクタ引数として渡します。

public class EndSubscriber<T> implements Subscriber<T> {

    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();

    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume
          = new AtomicInteger(howMuchMessagesConsume);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
   //...

}

必要に応じて要素を要求できます。

指定された__Subscriptionから1つの要素のみを消費するテストを書きましょう。

@Test
public void whenRequestForOnlyOneElement__thenShouldConsumeOne()
  throws InterruptedException {

   //given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");

   //when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();

   //then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() ->
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}


publisher

は6つの要素を公開していますが、

EndSubscriber

は1つの要素のみを処理することを要求するので、1つの要素のみを消費します。


Subscriptionで

request()__メソッドを使用することで、メッセージの消費速度を制御するためのより洗練されたバックプレッシャーメカニズムを実装できます。


6. 結論

この記事では、Java 9 Reactive Streamsについて説明しました。


Publisher



Subscriberからなる処理

Flow__を作成する方法を見ました。

最後に、

Subscriptionberを使用して要素の需要を制御するために

Subscription__を使用しました。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-9[GitHubプロジェクト]にあります。そのままインポートして実行するのは簡単です。