1. 序章

このチュートリアルでは、Jersey APIを使用したリアクティブ(Rx)プログラミングのJAX-RSサポートについて説明します。 この記事は、読者がJerseyRESTクライアントAPIの知識を持っていることを前提としています。

リアクティブプログラミングの概念にある程度精通していると役立ちますが、必須ではありません。

2. 依存関係

まず、標準のJerseyクライアントライブラリの依存関係が必要です。

<dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-client</artifactId>
    <version>2.27</version>
</dependency>
<dependency>
    <groupId>org.glassfish.jersey.inject</groupId>
    <artifactId>jersey-hk2</artifactId>
    <version>2.27</version>
</dependency>

これらの依存関係により、標準のJAX-RSリアクティブプログラミングサポートが提供されます。 jersey-clientおよびjersey-hk2の現在のバージョンは、MavenCentralで入手できます。

サードパーティのリアクティブフレームワークのサポートには、次の拡張機能を使用します。

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-rxjava</artifactId>
    <version>2.27</version>
</dependency>

上記の依存関係は、RxJavaの Observable;のサポートを提供します。新しいRxJava2のFlowable、は次の拡張機能を使用します。

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-rxjava2</artifactId>
    <version>2.27</version>
</dependency>

rxjavaおよびrxjava2への依存関係は、MavenCentralでも利用できます。

3. リアクティブJAX-RSクライアントが必要な理由

消費するRESTAPIが3つあるとしましょう。

  • id-service は、ロングユーザーIDのリストを提供します
  • name-service は、特定のユーザーIDのユーザー名を提供します
  • hash-service は、ユーザーIDとユーザー名の両方のハッシュを返します

サービスごとにクライアントを作成します。

Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService 
  = client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");

これは不自然な例ですが、説明のために機能します。 JAX-RS仕様は、これらのサービスを一緒に利用するための少なくとも3つのアプローチをサポートしています。

  • 同期(ブロッキング)
  • 非同期(ノンブロッキング)
  • リアクティブ(機能的、ノンブロッキング)

3.1. 同期ジャージークライアント呼び出しの問題

これらのサービスを利用するためのバニラアプローチでは、 id-service を利用してユーザーIDを取得し、name-servicehash-serviceを呼び出します。返されたIDごとに順次API。

このアプローチでは、各呼び出しは、要求が満たされるまで実行中のスレッドをブロックします、結合された要求を満たすために合計で多くの時間を費やします。 これは、重要なユースケースでは明らかに満足のいくものではありません。

3.2. 非同期ジャージークライアント呼び出しの問題

より洗練されたアプローチは、JAX-RSでサポートされているInvocationCallbackメカニズムを使用することです。 最も基本的な形式では、 get メソッドにコールバックを渡して、特定のAPI呼び出しが完了したときに何が起こるかを定義します。

これで真の非同期実行(スレッド効率にいくつかの制限がある)が得られますが、このスタイルのコードが些細なシナリオ以外でどのように読めなくなり扱いにくいになるかは簡単にわかります。 JAX-RS仕様は、このシナリオを運命のピラミッドとして具体的に強調しています。

// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); 

userIdService.request()
  .accept(MediaType.APPLICATION_JSON)
  .async()
  .get(new InvocationCallback<List<Long>>() {
    @Override
    public void completed(List<Long> employeeIds) {
        employeeIds.forEach((id) -> {
        // for each employee ID, get the name
        nameService.resolveTemplate("userId", id).request()
          .async()
          .get(new InvocationCallback<String>() {
              @Override
              public void completed(String response) {
                     hashService.resolveTemplate("rawValue", response + id).request()
                    .async()
                    .get(new InvocationCallback<String>() {
                        @Override
                        public void completed(String response) {
                            //complete the business logic
                        }
                        // ommitted implementation of the failed() method
                    });
              }
              // omitted implementation of the failed() method
          });
        });
    }
    // omitted implementation of the failed() method
});

// wait for inner requests to complete in 10 seconds
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
    logger.warn("Some requests didn't complete within the timeout");
}

したがって、非同期で時間効率の良いコードを実現しましたが、次のようになります。

  • 読みにくい
  • 呼び出しごとに新しいスレッドが生成されます

を使用していることに注意してください CountDownLatch すべてのコード例で、すべての期待値がによって配信されるのを待つためにハッシュサービス。 これは、すべての期待値が実際に配信されたことを確認することで、コードが単体テストで機能することを確認できるようにするためです。

通常のクライアントは待機せず、スレッドをブロックしないように、コールバック内の結果に対して実行する必要があることをすべて実行します。

3.3. 機能的で反応的なソリューション

機能的で反応的なアプローチにより、次のことが可能になります。

  • 優れたコードの読みやすさ
  • 流暢なコーディングスタイル
  • 効果的なスレッド管理

JAX-RSは、以下のコンポーネントでこれらの目的をサポートします。

  • CompletionStageRxInvoker は、デフォルトのリアクティブコンポーネントとしてCompleteStageインターフェイスをサポートします
  • RxObservableInvokerProvider は、RxJavaのObservableをサポートします
  • RxFlowableInvokerProviderはRxJavaのFlowableをサポートします

他のリアクティブライブラリのサポートを追加するためのAPIもあります。

4. JAX-RSリアクティブコンポーネントのサポート

4.1. JAX-RSのCompletionStage

CompletionStage とその具体的な実装– CompleteableFuture – を使用して、エレガントで、ノンブロッキングで流暢なサービスコールオーケストレーションを作成できます。

まず、ユーザーIDを取得します。

CompletionStage<List<Long>> userIdStage = userIdService.request()
  .accept(MediaType.APPLICATION_JSON)
  .rx()
  .get(new GenericType<List<Long>>() {
}).exceptionally((throwable) -> {
    logger.warn("An error has occurred");
    return null;
});

rx()メソッド呼び出しは、リアクティブ処理が開始されるポイントです。 例外的に関数を使用して、例外処理シナリオを流暢に定義します。

ここから、呼び出しをクリーンに調整してNameサービスからユーザー名を取得し、名前とユーザーIDの両方の組み合わせをハッシュできます。

List<String> expectedHashValues = ...;
List<String> receivedHashValues = new ArrayList<>(); 

// used to keep track of the progress of the subsequent calls 
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); 

userIdStage.thenAcceptAsync(employeeIds -> {
  logger.info("id-service result: {}", employeeIds);
  employeeIds.forEach((Long id) -> {
    CompletableFuture completable = nameService.resolveTemplate("userId", id).request()
      .rx()
      .get(String.class)
      .toCompletableFuture();

    completable.thenAccept((String userName) -> {
        logger.info("name-service result: {}", userName);
        hashService.resolveTemplate("rawValue", userName + id).request()
          .rx()
          .get(String.class)
          .toCompletableFuture()
          .thenAcceptAsync(hashValue -> {
              logger.info("hash-service result: {}", hashValue);
              receivedHashValues.add(hashValue);
              completionTracker.countDown();
          }).exceptionally((throwable) -> {
              logger.warn("Hash computation failed for {}", id);
              return null;
         });
    });
  });
});

if (!completionTracker.await(10, TimeUnit.SECONDS)) {
    logger.warn("Some requests didn't complete within the timeout");
}

assertThat(receivedHashValues).containsAll(expectedHashValues);

上記のサンプルでは、流暢で読みやすいコードを使用して3つのサービスの実行を構成しています。

メソッドthenAcceptAsyncは、指定された CompleteStage が実行を完了した(または例外をスローした)後、指定された関数を実行します。

連続する各呼び出しは非ブロッキングであり、システムリソースを適切に使用します。

CompletionStageインターフェースは、マルチステップオーケストレーション(または単一のサービスコール)で任意の数のステップを作成、順序付け、非同期で実行できるようにするさまざまなステージングおよびオーケストレーションメソッドを提供します。

4.2. JAX-RSで観測可能な

Observable RxJavaコンポーネントを使用するには、最初に RxObservableInvokerProvider プロバイダー(Jersey仕様書に記載されている「 ObservableRxInvokerProvider」ではない)を登録する必要があります。クライアント:

Client client = client.register(RxObservableInvokerProvider.class);

次に、デフォルトの呼び出し元をオーバーライドします。

Observable<List<Long>> userIdObservable = userIdService
  .request()
  .rx(RxObservableInvoker.class)
  .get(new GenericType<List<Long>>(){});

この時点から、は標準のObservableセマンティクスを使用して、処理フローを調整できます

userIdObservable.subscribe((List<Long> listOfIds)-> { 
  /** define processing flow for each ID */
});

4.3. JAX-RSのFlowable

RxJavaを使用するためのセマンティクス流動性のそれに似ています観察可能。 適切なプロバイダーを登録します。

client.register(RxFlowableInvokerProvider.class);

次に、RxFlowableInvokerを提供します。

Flowable<List<Long>> userIdFlowable = userIdService
  .request()
  .rx(RxFlowableInvoker.class)
  .get(new GenericType<List<Long>>(){});

その後、通常の FlowableAPIを使用できます。

5. 結論

JAX-RS仕様は、REST呼び出しのクリーンでノンブロッキングの実行を実現する多数のオプションを提供します。

特に、 CompletionStage インターフェースは、さまざまなサービスオーケストレーションシナリオをカバーする堅牢な一連のメソッドと、カスタムExecutorsを提供してよりきめ細かい制御を行う機会を提供します。スレッド管理。

この記事のコードはGithubで確認できます。