1. 概要

このチュートリアルでは、JettyからReactiveHTTPクライアントを使用する方法を学習します。 小さなテストケースを作成することにより、さまざまなReactiveライブラリでの使用法を示します。

2. リアクティブHttpClientとは何ですか?

JettyのHttpClientを使用すると、HTTPリクエストのブロックを実行できます。 ただし、Reactive APIを扱っている場合、標準のHTTPクライアントを使用することはできません。 このギャップを埋めるために、Jettyは HttpClient APIのラッパーを作成し、 ReactiveStreamsAPIもサポートするようにしました。

Reactive HttpClient は、HTTP呼び出しを介してデータのストリームを消費または生成するために使用されます。

ここで説明する例には、さまざまなReactiveライブラリを使用してJettyサーバーと通信するReactiveHTTPクライアントがあります。 また、Reactive HttpClientによって提供されるリクエストイベントとレスポンスイベントについても説明します。

Project Reactor RxJava 、および Spring WebFlux に関する記事を読んで、リアクティブプログラミングの概念とその用語をよりよく理解することをお勧めします。

3. Mavenの依存関係

の依存関係を追加して例を始めましょうリアクティブストリーム プロジェクトReactor RxJava Spring WebFlux 、 と JettyのリアクティブHTTPクライアント私たちに pom.xml。 これらに加えて、の依存関係を追加します Jettyサーバーサーバーの作成についても同様です。

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

4. サーバーとクライアントの作成

次に、サーバーを作成し、リクエストの本文をレスポンスに書き込むだけのリクエストハンドラーを追加しましょう。

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

そして、HttpClientと書くことができます。

HttpClient httpClient = new HttpClient();
httpClient.start();

クライアントとサーバーを作成したので、このブロッキングHTTPクライアントを非ブロッキングHTTPクライアントに変換してリクエストを作成する方法を見てみましょう。

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

したがって、ここでは、Jettyによって提供される ReactiveRequest ラッパーにより、ブロッキングHTTPクライアントがリアクティブになりました。 先に進んで、さまざまなリアクティブライブラリでの使用法を見てみましょう。

5. ReactiveStreams使用法

JettyのHttpClientReactiveStreams をネイティブにサポートしているので、そこから始めましょう。

さて、 Reactive Streamsは単なるインターフェースのセットなので、テストのために、単純なブロッキングサブスクライバーを実装しましょう。

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); 
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }   
}

JavaDocに従ってSubscription#request を呼び出す必要があることに注意してください。これは、「このメソッドを介して要求が通知されるまで、パブリッシャーによってイベントは送信されません」と述べています。

また、安全メカニズムを追加して、5秒以内に値が表示されなかった場合にテストを中止できるようにしたことにも注意してください。

これで、HTTPリクエストをすばやくテストできます。

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. プロジェクトリアクターの使用法

ここで、ProjectReactorでReactiveHttpClientを使用する方法を見てみましょう。 パブリッシャーの作成は、前のセクションとほとんど同じです。

パブリッシャーの作成後、プロジェクトReactorの Mono クラスを使用して、リアクティブな応答を取得しましょう。

ReactiveResponse response = Mono.from(publisher).block();

次に、結果の応答をテストできます。

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. SpringWebFluxの使用法

Spring WebFluxで使用すると、ブロックしているHTTPクライアントをリアクティブクライアントに簡単に変換できます。 Spring WebFluxには、さまざまなHTTPクライアントライブラリで使用できるリアクティブクライアントWebClientが付属しています。 これは、直接のProjectReactorコードを使用する代わりに使用できます。

まず、 JettyClientHttpConnector を使用してJettyのHTTPクライアントをラップし、 WebClient:と結合しましょう。

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

次に、このコネクタを WebClient に渡して、非ブロッキングHTTPリクエストを実行します。

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

次に、作成したばかりのリアクティブHTTPクライアントを使用して実際のHTTP呼び出しを実行し、結果をテストします。

String responseContent = client.post()
  .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
  .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
  .retrieve()
  .bodyToMono(String.class)
  .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. RxJava2使用法

次に、ReactiveHTTPクライアントがRxJava2でどのように使用されるかを見てみましょう。 。 

ここにいる間に、例を少し変更して、リクエストに本文を含めましょう。

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
  .content(ReactiveRequest.Content
    .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
  .build();
Publisher<String> publisher = reactiveRequest
  .response(ReactiveResponse.Content.asString());

コードReactiveResponse.Content.asString()は、応答本文を文字列に変換します。 リクエストのステータスのみに関心がある場合は、 ReactionResponse.Content.discard()メソッドを使用してレスポンスを破棄することもできます。

これで、RxJava2を使用して応答を取得することは、実際にはProjectReactorと非常によく似ていることがわかります。 基本的に、Monoの代わりにSingleを使用します。

String responseContent = Single.fromPublisher(publisher)
  .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. リクエストとレスポンスのイベント

Reactive HTTPクライアントは、実行中にいくつかのイベントを発行します。 それらは、要求イベントと応答イベントに分類されます。 これらのイベントは、リアクティブHTTPクライアントのライフサイクルを確認するのに役立ちます。

今回は、リクエストの代わりにHTTPクライアントを使用して、リアクティブリクエストを少し異なる方法で作成しましょう。

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
  .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
  .build();

次に、HTTPリクエストイベントのPublisherを取得しましょう。

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

それでは、もう一度RxJavaを使用してみましょう。 今回は、イベントタイプを保持するリストを作成し、発生したリクエストイベントをサブスクライブしてリストに入力します。

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
  .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

次に、テスト中であるため、応答をブロックして検証できます。

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

同様に、応答イベントをサブスクライブすることもできます。 これらはリクエストイベントサブスクリプションに似ているため、ここでは後者のみを追加しました。 リクエストイベントとレスポンスイベントの両方を含む完全な実装は、この記事の最後にリンクされているGitHubリポジトリにあります。

9. 結論

このチュートリアルでは、Jettyが提供する ReactiveStreams HttpClient 、さまざまなReactiveライブラリでの使用法、およびReactiveリクエストに関連するライフサイクルイベントについて学習しました。

記事に記載されているすべてのコードスニペットは、GitHubリポジトリにあります。