1. 序章

Ratpackは、 Netty エンジン上に構築されたフレームワークであり、HTTPアプリケーションをすばやく構築できます。 その基本的な使用法については、以前の記事ですでに説明しました。 今回は、ストリーミングAPIを使用してリアクティブアプリケーションを実装する方法を示します

2. リアクティブストリームの簡単な要約

実際の実装に入る前に、まずリアクティブアプリケーションを構成するものについて簡単に要約してみましょう。 の元の作成者によると、このようなアプリケーションには次のプロパティが必要です。

  • レスポンシブ
  • 弾力性
  • 弾性
  • メッセージドリブン

では、リアクティブストリームは、これらのプロパティのいずれかを実現するのにどのように役立ちますか? このコンテキストでは、メッセージ駆動型は必ずしもメッセージングミドルウェアの使用を意味するわけではありません。 代わりに、この点に対処するために実際に必要なのは、非同期要求処理とノンブロッキングバックプレッシャのサポートです。

Ratpackリアクティブサポートは、JVMのReactiveStreamsAPI標準を実装のベースとして使用します。 そのため、ProjectReactorやRxJavaなどの他の互換性のあるフレームワークとの相互運用が可能になります。

3. RatpacksのStreamsクラスの使用

RatpackのStreamsクラスは、 Publisher インスタンスを作成するためのいくつかのユーティリティメソッドを提供します。これらのインスタンスを使用して、データ処理パイプラインを作成できます。

良い出発点はpublish()メソッドです。これを使用して、任意のIterableからPublisherを作成できます。

Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();

ここで、 LoggingSubscriber は、パブリッシャーによって発行されたすべてのオブジェクトをログに記録するSubscriberインターフェイスのテスト実装です。 また、名前が示すように、パブリッシャーがすべてのオブジェクトを発行するかエラーを生成するまで呼び出し元をブロックするヘルパーメソッド block()も含まれています。

テストケースを実行すると、予想される一連のイベントが表示されます。

onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908

もう1つの便利なメソッドはyield()です。 これには、 YieldRequest オブジェクトを受け取り、次に発行するオブジェクトを返す単一のFunctionパラメーターがあります。

@Test
public void whenYield_thenSuccess() {
    
    Publisher<String> pub = Streams.yield((t) -> {
        return t.getRequestNum() < 5 ? "hello" : null;
    });
    
    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

YieldRequest パラメーターを使用すると、 getRequestNum()メソッドを使用して、これまでに発行されたオブジェクトの数に基づいてロジックを実装できます。 この例では、この情報を使用して終了条件を定義します。これは、null値を返すことによって通知します。

それでは、定期的なイベント用にPublisherを作成する方法を見てみましょう。

@Test
public void whenPeriodic_thenSuccess() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
        return t < 5 ? String.format("hello %d",t): null; 
    });

    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

返されたパブリッシャーは、 ScheduledExecutorService を使用して、null値を返すまでプロデューサー関数を定期的に呼び出します。 プロデューサー関数は、ストリームを終了するために使用する、すでに放出されたオブジェクトの数に対応する整数値を受け取ります。

4. TransformablePublisherを使用する

Streamsのメソッドを詳しく見ると、通常はTransformablePublisherを返すことがわかります。 このインターフェイスは、ProjectReactorのFluxおよびMono にあるものと同様に、 Publisher をいくつかのユーティリティメソッドで拡張し、複雑な処理を簡単に作成できるようにします。個々のステップからのパイプライン

例として、 map メソッドを使用して、整数のシーケンスを文字列に変換してみましょう。

@Test
public void whenMap_thenSuccess() throws Exception {
    TransformablePublisher<String> pub = Streams.yield( t -> {
        return t.getRequestNum() < 5 ? t.getRequestNum() : null;
      })
      .map(v -> String.format("item %d", v));
    
    ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
    assertTrue("should succeed", result.isSuccess());
    assertEquals("should have 5 items",5,result.getValue().size());
}

ここで、実際の実行は、テストユーティリティクラスExecHarnessによって管理されるスレッドプール内で行われます。 yieldSingle() Promise を想定しているため、 toList()を使用してパブリッシャーを適合させます。 このメソッドは、サブスクライバーによって生成されたすべての結果を収集し、それらをリストに格納します。

ドキュメントに記載されているように、この方法を使用する場合は注意が必要です。 無制限のパブリッシャーに適用すると、JVMのメモリがすぐに不足する可能性があります。 この状況を回避するには、その使用を主に単体テストに限定しておく必要があります

map()の他に、TransformablePublisherにはいくつかの便利な演算子があります。

  • filter()述語に基づいてアップストリームオブジェクトをフィルタリングします
  • take():アップストリームPublisherから最初のnオブジェクトのみを発行します
  • wiretap():パイプラインを流れるデータとイベントを検査できる観測ポイントを追加します
  • reduce():アップストリームオブジェクトを単一の値に減らします
  • transform():通常のPublisherをストリームに挿入します

5. buffer()を非準拠のパブリッシャーで使用する

一部のシナリオでは、要求されたよりも多くのアイテムをサブスクライバーに送信するパブリッシャーに対処する必要があります。 これらのシナリオに対処するために、RatpackのStreamsは buffer()メソッドを提供します。このメソッドは、サブスクライバーがそれらを消費するまで、これらの余分なアイテムをメモリに保持します。

これがどのように機能するかを説明するために、要求されたアイテムの数を無視する単純な非準拠のPublisherを作成しましょう。 代わりに、常に要求よりも少なくとも5つのアイテムが生成されます。

private class NonCompliantPublisher implements Publisher<Integer> {

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        log.info("subscribe");
        subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
    }
    
    private class NonCompliantSubscription implements Subscription {
        private Subscriber<? super Integer> subscriber;
        private int recurseLevel = 0;

        public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            log.info("request: n={}", n);
            if ( recurseLevel > 0 ) {
               return;
            }
            recurseLevel++;
            for (int i = 0 ; i < (n + 5) ; i ++ ) {
                subscriber.onNext(i);
            }
            subscriber.onComplete();
        }

        @Override
        public void cancel() {
        }
    }
}

まず、私たちを使用してこのパブリッシャーをテストしましょう LoggingSubscriber。 を使用します取った() 演算子なので、最初のアイテムだけを受け取ります

@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction(""))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

このテストを実行すると、 cancel()リクエストを受信したにもかかわらず、非準拠のパブリッシャーが新しいアイテムを作成し続けていることがわかります。

RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145

それでは、このストリームに buffer()ステップを追加しましょう。 2つのwiretapステップを追加して、その前にイベントをログに記録します。これにより、その効果がより明確になります。

@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction("before buffer"))
      .buffer()
      .wiretap(new LoggingAction("after buffer"))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

今回は、このコードを実行すると、異なるログシーケンスが生成されます。

LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214

「バッファ前」メッセージは、非準拠のパブリッシャーがrequestへの最初の呼び出し後にすべての値を送信できたことを示しています。 それでも、 LoggingSubscriber によって要求された量を尊重して、ダウンストリーム値が1つずつ送信されました。

6. 遅いサブスクライバーでのbatch()の使用

アプリケーションのスループットを低下させる可能性のある別のシナリオは、ダウンストリームサブスクライバーが少量のデータを要求する場合です。 私たちのLoggingSubscriberは良い例です:一度に1つのアイテムだけを要求します。

実際のアプリケーションでは、これにより多くのコンテキストスイッチが発生し、全体的なパフォーマンスが低下する可能性があります。より適切なアプローチは、一度に多数のアイテムをリクエストすることです。 batch()メソッドを使用すると、アップストリームのパブリッシャーはより効率的なリクエストサイズを使用でき、ダウンストリームのサブスクライバーはより小さなリクエストサイズを使用できます。

これが実際にどのように機能するかを見てみましょう。 前と同じように、バッチのないストリームから始めます。

@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction(""));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

ここで、 CompliancePublisher は、コンストラクターに渡される値までの整数を生成するテストPublisherです。 それを実行して、バッチ処理されていない動作を確認しましょう。

CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331

出力は、プロデューサーが値を1つずつ出力することを示しています。 次に、ステップ batch()をパイプラインに追加して、アップストリームパブリッシャーが一度に最大5つのアイテムを生成するようにします。

@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
    
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction("before batch"))
      .batch(5, Action.noop())
      .wiretap(new LoggingAction("after batch"));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

batch()メソッドは、2つの引数を取ります。各 request()呼び出しで要求されたアイテムの数と、破棄されたアイテム、つまりアイテムを処理するためのActionです。要求されましたが、消費されていません。 この状況は、エラーが発生した場合、またはダウンストリームサブスクライバーが cancel()を呼び出した場合に発生する可能性があります。 結果の実行ログを見てみましょう。

LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690

パブリッシャーが毎回5つのアイテムのリクエストを受け取るようになっていることがわかります。 このテストシナリオでは、ロギングサブスクライバーが最初のアイテムを取得する前でも、プロデューサーへのtwo要求が表示されることに注意してください。 その理由は、このテストシナリオではシングルスレッド実行であるため、 batch ()は、 onComplete()シグナルを取得するまでアイテムをバッファリングし続けるためです。

7. Webアプリケーションでのストリームの使用

Ratpackは、非同期Webフレームワークと組み合わせたリアクティブストリームの使用をサポートしています。

7.1. データストリームの受信

着信データの場合、ハンドラーのContextを介して利用可能なRequestオブジェクトは、 TransformablePublisherを返すgetBodyStream()メソッドを提供します]ByteBufオブジェクト。

このパブリッシャーから、処理パイプラインを構築できます。

@Bean
public Action<Chain> uploadFile() {
    
    return chain -> chain.post("upload", ctx -> {
        TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
        pub.subscribe(new Subscriber<ByteBuf>() {
            private Subscription sub;
            @Override
            public void onSubscribe(Subscription sub) {
                this.sub = sub;
                sub.request(1);
            }

            @Override
            public void onNext(ByteBuf t) {
                try {
                    ... do something useful with received data
                    sub.request(1);
                }
                finally {
                    // DO NOT FORGET to RELEASE !
                    t.release();
                }
            }

            @Override
            public void onError(Throwable t) {
                ctx.getResponse().status(500);
            }

            @Override
            public void onComplete() {
                ctx.getResponse().status(202);
            }
        }); 
    });
}

サブスクライバーを実装する際に考慮すべき詳細がいくつかあります。 まず、ある時点で ByteBufrelease()メソッドを呼び出すようにする必要があります。 これを怠ると、メモリリークが発生します。 次に、非同期処理では、Ratpackのプリミティブのみを使用する必要があります。 それらには、 Promise Blocking、および同様の構成が含まれます。

7.2. データストリームの送信

データストリームを送信する最も直接的な方法は、 Response.sendStream()を使用することです。 このメソッドは、 ByteBuf パブリッシャー引数を取り、データをクライアントに送信し、オーバーフローを回避するために必要に応じてバックプレッシャを適用します。

@Bean
public Action<Chain> download() {
    return chain -> chain.get("download", ctx -> {
        ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
    });
}

単純ですが、この方法を使用すると欠点があります。クライアントにとって問題となる可能性のあるContent-Lengthを含むヘッダーが単独で設定されない:

$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted

あるいは、より良いメソッドは、ハンドルのContext render()メソッドを使用して、ResponseChunksオブジェクトを渡すことです。 この場合、応答は「chunked」転送エンコード方式を使用します。 ResponseChunks インスタンスを作成する最も簡単な方法は、このクラスで使用可能な静的メソッドの1つを使用することです。

@Bean
public Action<Chain> downloadChunks() {
    return chain -> chain.get("downloadChunks", ctx -> {
        ctx.render(ResponseChunks.bufferChunks("application/octetstream",
          new RandomBytesPublisher(1024,512)));
    });
}

この変更により、応答にcontent-typeヘッダーが含まれるようになりました。

$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted

7.3. サーバー側イベントの使用

サーバーサイドイベント(SSE)のサポートでも、 render()メソッドが使用されます。 ただし、この場合、 ServerSentEvents を使用して、ProducerからのアイテムをEventオブジェクトに適合させます。

@Bean
public Action<Chain> quotes() {
    ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
        evt
          .id(Long.toString(idSeq.incrementAndGet()))
          .event("quote")
          .data( q -> q.toString());
    });
    
    return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}

ここで、 QuotesService は、定期的にランダムな見積もりを生成するPublisherを作成するサンプルサービスです。 2番目の引数は、イベントを送信する準備をする関数です。 これには、 id 、イベントタイプ、およびペイロード自体の追加が含まれます。

curl を使用してこのメソッドをテストし、ランダムな引用符のシーケンスとイベントメタデータを示す出力を生成できます。

$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]

... more quotes

7.4. WebSocketデータのブロードキャスト

Websockets.websocketBroadcast()を使用して、任意のPublisherからWebSocket接続にデータをパイプできます。

@Bean
public Action<Chain> quotesWS() {
    Publisher<String> pub = Streams.transformable(quotesService.newTicker())
      .map(Quote::toString);
    return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}

ここでは、以前に見たのと同じ QuotesService を、クライアントに見積もりをブロードキャストするためのイベントソースとして使用します。 もう一度curlを使用して、WebSocketクライアントをシミュレートしてみましょう。

$ curl --include -v \
     --no-buffer \
     --header "Connection: Upgrade" \
     --header "Upgrade: websocket" \
     --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
     --header "Sec-WebSocket-Version: 13" \
     http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted

8. 結論

この記事では、Ratpackによるリアクティブストリームのサポートと、それをさまざまなシナリオに適用する方法について説明しました。

いつものように、例の完全なソースコードはGitHubにあります。