Java 9リアクティブストリーム
1.概要
この記事では、Java 9 Reactive Streamsに注目します。簡単に言うと、リアクティブストリーム処理ロジックを構築するための主要な構成要素を含む
Flow
クラスを使用することができます。
Reactive Streams
は、ノンブロッキングバックプレッシャを使用した非同期ストリーム処理の標準です。この仕様は、
Reactive Manifesto
、
に定義されており、さまざまな実装があります。たとえば、
RxJava
や
Akka-Streams.
2リアクティブAPIの概要
Flow
を構築するために、3つの主要な抽象化を使い、それらを非同期処理ロジックに組み立てることができます。
-
すべての
Flow
は、Publisherインスタンスによってそれに発行されたイベントを処理する必要があります** 。
Publisher
には、1つのメソッド –
subscribe().
があります。
それによって発行されたイベントを受信したい購読者がいる場合は、指定された
Publisher.
に購読する必要があります。
-
メッセージの受信者は
Subscriber
インターフェースを実装する必要があります** 通常、これはインスタンスがそれ以上メッセージを送信しないため、すべての
Flow__処理の終わりです。
Subscriber
は
Sink.
と考えることができます。これには、オーバーライドする必要がある4つのメソッド、
onSubscribe()、onNext()、onError()、
、および__onComplete()があります。 。
-
受信メッセージを変換してさらに次の
Subscriberに渡したい場合は、
Processor
インタフェースを実装する必要があります。** メッセージを受信するので
Subscriber
として、またメッセージを処理して送信するので
Publisher__として機能しますさらなる処理のために。
** 3メッセージのパブリッシュと消費
単純な
Flowを作成したいとしましょう
、そこには
Publisher
パブリッシングメッセージと、到着したときに単純な
Subscriber
消費メッセージがあります。
EndSubscriber
クラスを作成しましょう。
Subscriber
インターフェースを実装する必要があります。次に、必要なメソッドをオーバーライドします。
処理が始まる前に
onSubscribe()
メソッドが呼び出されます。
Subscription
のインスタンスが引数として渡されます。
Subscriber
と__Publisherの間のメッセージの流れを制御するために使用されるクラスです。
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
テストで使用する空の
List
of
consumedElements
も初期化しました。
それでは、残りのメソッドを
Subscriber
インターフェースから実装する必要があります。ここでの主なメソッドはonNext()です。これは
Publisher
が新しいメッセージを発行するたびに呼び出されます。
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
onSubscribe()
メソッドでサブスクリプションを開始したとき、およびメッセージを処理したときに、
Subscription
で
request()
メソッドを呼び出して、現在の
Subscriber
がより多くのメッセージを消費する準備ができていることを通知する必要があります。
最後に、
onError()
を実装する必要があります。これは、処理で何らかの例外がスローされるときにはいつでも呼び出されます。
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
Processing __Flowのテストを書きましょう。
N
個の要素を
Publisher
に送信します。これを
EndSubscriber
が受け取ります。
@Test
public void whenSubscribeToIt__thenShouldConsumeAll()
throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
EndSubscriberのインスタンスで
close()
メソッドを呼び出していることに注意してください。指定された
Publisher
のすべての
Subscriber
で、その下の
onComplete()__コールバックが呼び出されます。
そのプログラムを実行すると、次のような出力が生成されます。
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4メッセージの変換
Publisher
と
Subscriber
の間で同様のロジックを構築したいが、変換も適用したいとしましょう。
Processor
を実装し、
SubmissionPublisherを拡張した
TransformProcessor__クラスを作成します。
入力を出力に変換する
Function
を渡します。
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
それでは、
Publisher
が
String
要素を公開している処理フローで
簡単なテストを書いてみましょう
。
TransformProcessor
は、
String
を
Integer
として構文解析します。つまり、変換はここで行われる必要があります。
@Test
public void whenSubscribeAndTransformElements__thenShouldConsumeAll()
throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
//when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
ベース
Publisher
で
close()
メソッドを呼び出すと、
TransformProcessor
で
onComplete()
メソッドが呼び出されることに注意してください。
処理チェーン内のすべてのパブリッシャーはこのようにしてクローズする必要があることに留意してください。
5
Subscription
を使用してメッセージの需要を制御する
Subscriptionの最初の要素だけを使い、ロジックを適用して処理を終了したいとしましょう。これを実現するために
request()
メソッドを使用できます。
N個のメッセージのみを消費するように
EndSubscriber
を変更しましょう。
その数を
howMuchMessagesConsume
コンストラクタ引数として渡します。
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...
}
必要に応じて要素を要求できます。
指定された__Subscriptionから1つの要素のみを消費するテストを書きましょう。
@Test
public void whenRequestForOnlyOneElement__thenShouldConsumeOne()
throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
publisher
は6つの要素を公開していますが、
EndSubscriber
は1つの要素のみを処理することを要求するので、1つの要素のみを消費します。
Subscriptionで
request()__メソッドを使用することで、メッセージの消費速度を制御するためのより洗練されたバックプレッシャーメカニズムを実装できます。
6. 結論
この記事では、Java 9 Reactive Streamsについて説明しました。
Publisher
と
Subscriberからなる処理
Flow__を作成する方法を見ました。
最後に、
Subscriptionberを使用して要素の需要を制御するために
Subscription__を使用しました。
これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-9[GitHubプロジェクト]にあります。そのままインポートして実行するのは簡単です。