1. 概要

多くの場合、私たちのWebサービスは、仕事をするために他のWebサービスを使用する必要があります。 応答時間を短くしながら、ユーザーの要求に対応するのは難しい場合があります。 外部サービスが遅いと、応答時間が長くなり、システムがより多くのリソースを使用してリクエストを積み上げる可能性があります。 これは、ノンブロッキングアプローチが非常に役立つ場合がある場所です

このチュートリアルでは、 PlayFrameworkアプリケーションからサービスに対して複数の非同期リクエストを実行します。 JavaのノンブロッキングHTTP機能を活用することで、独自のメインロジックに影響を与えることなく、外部リソースをスムーズにクエリできるようになります。

この例では、 Play WebServiceLibraryについて説明します。

2. Play WebService(WS)ライブラリ

WSは、Java Actionを使用して非同期HTTP呼び出しを提供する強力なライブラリです。

このライブラリを使用して、コードはこれらのリクエストを送信し、ブロックせずに続行します。 リクエストの結果を処理するために、消費関数、つまりConsumerインターフェースの実装を提供します。

このパターンは、JavaScriptのコールバックの実装、 Promises、、および async /awaitパターンといくつかの類似点を共有しています。

応答データの一部をログに記録する単純なConsumerを作成してみましょう。

ws.url(url)
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()))

Consumerはこの例にログインしているだけです。 ただし、結果をデータベースに保存するなど、消費者は結果に対して必要なことは何でもできます。

ライブラリの実装を詳しく調べると、WSがJavaの AsyncHttpClient をラップして構成していることがわかります。これは、標準のJDKの一部であり、Playに依存しません。

3. サンプルプロジェクトを準備する

フレームワークを試すために、リクエストを起動するための単体テストをいくつか作成しましょう。 それらに応答するスケルトンWebアプリケーションを作成し、WSフレームワークを使用してHTTPリクエストを作成します。

3.1. スケルトンWebアプリケーション

まず、 sbtnewコマンドを使用して初期プロジェクトを作成します。

sbt new playframework/play-java-seed.g8

次に、新しいフォルダーで、 build.sbtファイルを編集し、WSライブラリの依存関係を追加します。

libraryDependencies += javaWs

これで、 sbtrunコマンドを使用してサーバーを起動できます。

$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

アプリケーションが起動したら、 http:// localhost:9000 を参照して、すべてが正常であることを確認できます。これにより、Playのウェルカムページが開きます。

3.2. テスト環境

アプリケーションをテストするには、単体テストクラスHomeControllerTestを使用します。

まず、サーバーのライフサイクルを提供するWithServerを拡張する必要があります。

public class HomeControllerTest extends WithServer {

親のおかげで、このクラスは、テストを実行する前に、スケルトンWebサーバーをテストモードでランダムポートで起動するようになりました。 WithServer クラスも、テストが終了するとアプリケーションを停止します。

次に、実行するアプリケーションを提供する必要があります。

GuiceGuiceApplicationBuilderで作成できます。

@Override
protected Application provideApplication() {
    return new GuiceApplicationBuilder().build();
}

最後に、テストサーバーから提供されたポート番号を使用して、テストで使用するサーバーURLを設定します。

@Override
@Before
public void setup() {
    OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
    if (optHttpsPort.isPresent()) {
        port = optHttpsPort.getAsInt();
        url = "https://localhost:" + port;
    } else {
        port = testServer.getRunningHttpPort()
          .getAsInt();
        url = "http://localhost:" + port;
    }
}

これで、テストを作成する準備が整いました。 包括的なテストフレームワークにより、テストリクエストのコーディングに集中できます。

4. WSRequestを準備します

GETやPOSTなどの基本的なタイプのリクエストと、ファイルアップロードのマルチパートリクエストを実行する方法を見てみましょう。

4.1. WSRequestオブジェクトを初期化します

まず、リクエストを構成および初期化するために、WSClientインスタンスを取得する必要があります。

実際のアプリケーションでは、依存性注入を介して、デフォルト設定で自動構成されたクライアントを取得できます。

@Autowired
WSClient ws;

ただし、テストクラスでは、 Play TestFrameworkから入手できるWSTestClientを使用します。

WSClient ws = play.test.WSTestClient.newClient(port);

クライアントを取得したら、urlメソッドを呼び出してWSRequestオブジェクトを初期化できます。

ws.url(url)

urlメソッドは、リクエストを実行するのに十分です。 ただし、いくつかのカスタム設定を追加することで、さらにカスタマイズできます。

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + num);

ご覧のとおり、ヘッダーとクエリパラメータを追加するのは非常に簡単です。

リクエストを完全に設定したら、メソッドを呼び出してリクエストを開始できます。

4.2. 一般的なGETリクエスト

GETリクエストをトリガーするには、WSRequestオブジェクトでgetメソッドを呼び出す必要があります。

ws.url(url)
  ...
  .get();

これは非ブロッキングコードであるため、リクエストを開始してから、関数の次の行で実行を続行します。

getによって返されるオブジェクトは、 CompletableFutureAPIの一部であるCompletionStageインスタンスです。

HTTP呼び出しが完了すると、このステージはいくつかの命令を実行します。 応答をWSResponseオブジェクトにラップします。

通常、この結果は実行チェーンの次のステージに渡されます。 この例では、消費関数を提供していないため、結果は失われます。

このため、このリクエストのタイプは「ファイアアンドフォーゲット」です。

4.3. フォームを送信する

フォームの送信は、getの例と大差ありません。

リクエストをトリガーするには、postメソッドを呼び出すだけです。

ws.url(url)
  ...
  .setContentType("application/x-www-form-urlencoded")
  .post("key1=value1&key2=value2");

このシナリオでは、パラメーターとして本文を渡す必要があります。これは、ファイル、jsonまたはxmlドキュメント、 BodyWritable Sourceなどの単純な文字列にすることができます。

4.4. マルチパート/フォームデータを送信する

マルチパートフォームでは、添付ファイルまたはストリームから入力フィールドとデータの両方を送信する必要があります。

これをフレームワークに実装するには、postメソッドとSourceを使用します。

ソース内で、フォームに必要なすべての異なるデータ型をラップできます。

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file = 
  new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
  .post(Source.from(Arrays.asList(file, data)));

このアプローチではさらに構成が追加されますが、それでも他のタイプの要求と非常によく似ています。

5. 非同期応答を処理する

これまでは、ファイアアンドフォーゲットリクエストのみをトリガーしており、コードは応答データに対して何もしません。

次に、非同期応答を処理するための2つの手法について説明します。

メインスレッドをブロックしてCompletableFuture、を待機するか、Consumerと非同期で消費することができます。

5.1. CompleteableFutureでブロックすることによるプロセス応答

非同期フレームワークを使用している場合でも、コードの実行をブロックして応答を待つことを選択する場合があります。

CompleteableFuture APIを使用して、このシナリオを実装するためにコードにいくつかの変更を加える必要があります。

WSResponse response = ws.url(url)
  .get()
  .toCompletableFuture()
  .get();

これは、たとえば、他の方法では達成できない強力なデータの一貫性を提供する場合に役立ちます。

5.2. 応答を非同期的に処理する

ブロッキングなしで非同期応答を処理するために、応答が利用可能なときに非同期フレームワークによって実行されるコンシューマーまたは関数を提供します。

たとえば、前の例に Consumer を追加して、応答をログに記録しましょう。

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + 1)
  .get()
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()));

次に、ログに応答が表示されます。

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
  "Result" : "ok",
  "Params" : {
    "num" : [ "1" ]
  },
  "Headers" : {
    "accept" : [ "*/*" ],
    "host" : [ "localhost:19001" ],
    "key" : [ "value" ],
    "user-agent" : [ "AHC/2.1" ]
  }
} | Current Time:1579303109613

thenAccept を使用したことは注目に値します。これには、ロギング後に何も返す必要がないため、Consumer関数が必要です。

現在のステージで何かを返し、次のステージで使用できるようにする場合は、代わりに thenApply が必要です。これは、関数を取ります。

これらは、標準のJava機能インターフェースの規則を使用します。

5.3. ラージレスポンスボディ

これまでに実装したコードは、小さな応答やほとんどのユースケースに適したソリューションです。 ただし、数百メガバイトのデータを処理する必要がある場合は、より優れた戦略が必要になります。

注意する必要があります:getやpostloadなどのリクエストメソッドは、応答全体をメモリにロードします。

OutOfMemoryError の可能性を回避するために、 Akka Streams を使用して、メモリをいっぱいにすることなく応答を処理できます。

たとえば、その本文をファイルに書き込むことができます。

ws.url(url)
  .stream()
  .thenAccept(
    response -> {
        try {
            OutputStream outputStream = Files.newOutputStream(path);
            Sink<ByteString, CompletionStage<Done>> outputWriter =
              Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
            response.getBodyAsSource().runWith(outputWriter, materializer);
        } catch (IOException e) {
            log.error("An error happened while opening the output stream", e);
        }
    });

The ストリームメソッドは CompletionStage どこ WSResponse があります getBodyAsStream を提供するメソッドソース

AkkaのSinkを使用して、このタイプのボディを処理する方法をコードに伝えることができます。この例では、OutputStreamを通過するデータを単純に書き込みます。

5.4. タイムアウト

リクエストを作成するときに、特定のタイムアウトを設定することもできるため、時間内に完全なレスポンスを受信しない場合、リクエストは中断されます。

これは、クエリしているサービスが特に遅く、応答を待ってスタックしている開いている接続の山を引き起こす可能性があることがわかった場合に特に便利な機能です。

チューニングパラメータを使用して、すべてのリクエストにグローバルタイムアウトを設定できます。 リクエスト固有のタイムアウトの場合、setRequestTimeoutを使用してリクエストに追加できます。

ws.url(url)
  .setRequestTimeout(Duration.of(1, SECONDS));

ただし、処理するケースはまだ1つあります。すべてのデータを受信した可能性がありますが、Consumerの処理が非常に遅い可能性があります。 これは、大量のデータ処理、データベース呼び出しなどがある場合に発生する可能性があります。

低スループットシステムでは、コードが完了するまでコードを実行させることができます。 ただし、長期にわたる活動を中止したい場合があります。

これを実現するには、コードをfutures処理でラップする必要があります。

コードで非常に長いプロセスをシミュレートしてみましょう。

ws.url(url)
  .get()
  .thenApply(
    result -> { 
        try { 
            Thread.sleep(10000L); 
            return Results.ok(); 
        } catch (InterruptedException e) { 
            return Results.status(SERVICE_UNAVAILABLE); 
        } 
    });

これにより、10秒後に OK 応答が返されますが、それほど長く待ちたくありません。

代わりに、 timeout ラッパーを使用して、1秒以内に待機するようにコードに指示します。

CompletionStage<Result> f = futures.timeout(
  ws.url(url)
    .get()
    .thenApply(result -> {
        try {
            Thread.sleep(10000L);
            return Results.ok();
        } catch (InterruptedException e) {
            return Results.status(SERVICE_UNAVAILABLE);
        }
    }), 1L, TimeUnit.SECONDS);

これで、futureはいずれかの方法で結果を返します。 Consumer が時間内に終了した場合の計算結果、またはfuturesタイムアウトによる例外です。

5.5. 例外の処理

前の例では、結果を返すか、例外で失敗する関数を作成しました。 したがって、両方のシナリオを処理する必要があります。

handleAsync メソッドを使用して、成功シナリオと失敗シナリオの両方を処理できます。

結果が得られた場合は結果を返したい、またはエラーをログに記録して例外を返し、さらに処理したいとします。

CompletionStage<Object> res = f.handleAsync((result, e) -> {
    if (e != null) {
        log.error("Exception thrown", e);
        return e.getCause();
    } else {
        return result;
    }
});

これで、コードは、スローされたTimeoutExceptionを含むCompletionStageを返すはずです。

返された例外オブジェクトのクラスでassertEqualsを呼び出すだけで、それを確認できます。

Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);

テストを実行すると、受け取った例外もログに記録されます。

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

6. リクエストフィルター

場合によっては、リクエストがトリガーされる前にロジックを実行する必要があります。

初期化されたらWSRequestオブジェクトを操作できますが、より洗練された手法はWSRequestFilterを設定することです。

フィルタは、初期化中にトリガーメソッドを呼び出す前に設定でき、要求ロジックに付加されます。

WSRequestFilter インターフェイスを実装して独自のフィルターを定義することも、既製のフィルターを追加することもできます。

一般的なシナリオは、リクエストを実行する前に、リクエストがどのように見えるかをログに記録することです。

この場合、AhcCurlRequestLoggerを設定する必要があります。

ws.url(url)
  ...
  .setRequestFilter(new AhcCurlRequestLogger())
  ...
  .get();

結果のログは、curlのような形式になります。

[info] p.l.w.a.AhcCurlRequestLogger - curl \
  --verbose \
  --request GET \
  --header 'key: value' \
  'http://localhost:19001'

logback.xml 構成を変更することで、目的のログレベルを設定できます。

7. キャッシング応答

WSClient は、応答のキャッシュもサポートしています。

この機能は、同じリクエストが複数回トリガーされ、毎回最新のデータが必要ない場合に特に便利です。

また、呼び出しているサービスが一時的にダウンしている場合にも役立ちます。

7.1. キャッシュの依存関係を追加する

キャッシュを構成するには、最初にbuild.sbtに依存関係を追加する必要があります。

libraryDependencies += ehcache

これにより、Ehcacheがキャッシュレイヤーとして構成されます。

Ehcacheが特に必要ない場合は、他のJSR-107キャッシュ実装を使用できます。

7.2. フォースキャッシングヒューリスティック

デフォルトでは、サーバーがキャッシュ構成を返さない場合、PlayWSはHTTP応答をキャッシュしません。

これを回避するには、 application.conf に設定を追加して、ヒューリスティックキャッシュを強制します。

play.ws.cache.heuristics.enabled=true

これにより、リモートサービスのアドバタイズされたキャッシュに関係なく、HTTP応答をキャッシュすることがいつ役立つかを決定するようにシステムが構成されます。

8. 追加のチューニング

外部サービスにリクエストを送信するには、クライアントの構成が必要になる場合があります。 user-agentヘッダーに応じて、リダイレクト、低速サーバー、または一部のフィルタリングを処理する必要がある場合があります。

これに対処するために、 application.conf のプロパティを使用して、WSクライアントを調整できます。

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# time to wait for the connection to be established
play.ws.timeout.connection=30
# time to wait for data after the connection is open
play.ws.timeout.idle=30
# max time available to complete the request
play.ws.timeout.request=300

基盤となるAsyncHttpClientを直接構成することもできます。

利用可能なプロパティの完全なリストは、AhcConfigのソースコードで確認できます。

9. 結論

この記事では、PlayWSライブラリとその主な機能について説明しました。 プロジェクトを構成し、一般的なリクエストを実行する方法と、同期と非同期の両方でそれらの応答を処理する方法を学びました。

私たちは大量のデータのダウンロードに取り組み、短期間の長期的な活動を削減する方法を見ました。

最後に、パフォーマンスを向上させるためのキャッシュと、クライアントを調整する方法について説明しました。

いつものように、このチュートリアルのソースコードは、GitHubからで入手できます。