1. 概要

Webクライアントがサーバーとの対話を維持することを希望する場合、WebSocketは便利なソリューションになります。 WebSocketは、永続的な全二重接続を維持します。 これサーバーとクライアント間で双方向メッセージを送信する機能を提供します。 

このチュートリアルでは、 PlayFrameworkAkkaでWebSocketを使用する方法を学習します。

2. 設定

簡単なチャットアプリケーションを設定しましょう。 ユーザーはサーバーにメッセージを送信し、サーバーはJSONPlaceholderからのメッセージで応答します。

2.1. PlayFrameworkアプリケーションのセットアップ

PlayFrameworkを使用してこのアプリケーションを構築します。

Introduction to Play in Java の手順に従って、簡単なPlayFrameworkアプリケーションをセットアップして実行しましょう。

2.2. 必要なJavaScriptファイルの追加

また、クライアント側のスクリプトにはJavaScriptを使用する必要があります。 これにより、サーバーからプッシュされた新しいメッセージを受信できるようになります。 これには、jQueryライブラリを使用します。

app / views / i ndex.scala.htmlファイルの下部にjQueryを追加しましょう。

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. Akkaのセットアップ

最後に、Akkaを使用してサーバー側のWebSocket接続を処理します。

build.sbt ファイルに移動して、依存関係を追加しましょう。

akka-actorおよびakka-testkitの依存関係を追加する必要があります。

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Akka Frameworkコードを使用およびテストできるようにするには、これらが必要です。

次に、Akkaストリームを使用します。 それでは、akka-stream依存関係を追加しましょう。

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

最後に、AkkaアクターからRESTエンドポイントを呼び出す必要があります。 このためには、akka-http依存関係が必要です。 その場合、エンドポイントは逆シリアル化する必要があるJSONデータを返すため、akka-http-jackson依存関係も追加する必要があります。

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

そして今、私たちはすべて準備ができています。 WebSocketを機能させる方法を見てみましょう!

3. AkkaアクターによるWebSocketの処理

PlayのWebSocket処理メカニズムは、Akkaストリームを中心に構築されています。 WebSocketはフローとしてモデル化されます。 そのため、着信WebSocketメッセージはフローに送られ、フローによって生成されたメッセージはクライアントに送信されます。

アクターを使用してWebSocketを処理するには、ActorRefをフローに変換するPlayユーティリティActorFlowが必要です。 これには主に、少し構成を加えたJavaコードが必要です。

3.1. WebSocketコントローラーメソッド

まず、Materializerインスタンスが必要です。 マテリアライザーは、ストリーム実行エンジンのファクトリです。

ActorSystemMaterializerをコントローラーapp/ controllers /HomeController.javaに挿入する必要があります。

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(
  ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

次に、ソケットコントローラーメソッドを追加しましょう。

public WebSocket socket() {
    return WebSocket.Json
      .acceptOrResult(this::createActorFlow);
}

ここでは、リクエストヘッダーを受け取り、futureを返す関数acceptOrResultを呼び出しています。 返されるfutureは、WebSocketメッセージを処理するためのフローです。

代わりに、リクエストを拒否して拒否結果を返すことができます。

それでは、フローを作成しましょう。

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

PlayFrameworkのFクラスは、関数型プログラミングスタイルヘルパーのセットを定義します。 この場合、 F. Either.Right を使用して接続を受け入れ、フローを返します。

クライアントが認証されていないときに接続を拒否したいとします。

このため、セッションでユーザー名が設定されているかどうかを確認できます。 そうでない場合は、HTTP403Forbiddenとの接続を拒否します。

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
      .getOptional("username")
      .map(username -> 
        F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
          createFlowForActor()))
      .orElseGet(() -> F.Either.Left(forbidden())));
}

F.Either.Right でフローを提供するのと同じ方法で、F.Either.Leftを使用して接続を拒否します。

最後に、フローをメッセージを処理するアクターにリンクします。

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

ActorFlow.actorRefは、メッセンジャーアクターによって処理されるフローを作成します。

3.2. routesファイル

それでは、 conf /routersにコントローラーメソッドのroutes定義を追加しましょう。

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

これらのルート定義は、 JavaのPlayアプリケーションでのルーティングで説明されているように、着信HTTP要求をコントローラーアクションメソッドにマップします。

3.3. アクターの実装

アクタークラスの最も重要な部分は、アクターが処理できるメッセージを決定するcreateReceiveメソッドです。

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

アクターは、JsonNodeクラスに一致するすべてのメッセージをonSendMessageハンドラーメソッドに転送します。

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    //..
    processMessage(requestDTO);
}

次に、ハンドラーはprocessMessageメソッドを使用してすべてのメッセージに応答します。

private void processMessage(RequestDTO requestDTO) {
    CompletionStage<HttpResponse> responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. AkkaHTTPでRESTAPIを使用する

JSONPlaceholderPostsにあるダミーメッセージジェネレーターにHTTPリクエストを送信します。 応答が到着すると、outと書き込んで応答をクライアントに送信します。

ランダムな投稿IDでエンドポイントを呼び出すメソッドを作成しましょう。

private CompletionStage<HttpResponse> getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(0, 100);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

また、JSON応答を取得するために、サービスの呼び出しから取得したHttpResponseも処理しています。

private CompletionStage<MessageDTO> consumeHttpResponse(
  HttpResponse httpResponse) {
    Materializer materializer = 
      Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

MessageConverter クラスは、JsonNodeとDTOの間で変換するためのユーティリティです。

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

次に、エンティティを破棄する必要があります。  discardEntityBytes コンビニエンスメソッドは、目的がない場合にエンティティを簡単に破棄するという目的を果たします。

バイトを破棄する方法を見てみましょう。

private void discardEntity(
  HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = 
      httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> 
        log.info("Entity discarded completely!"));
}

WebSocketの処理が完了したので、HTML5WebSocketを使用してこのためのクライアントをセットアップする方法を見てみましょう。

4. WebSocketクライアントのセットアップ

クライアントのために、簡単なWebベースのチャットアプリケーションを作成しましょう。

4.1. コントローラのアクション

インデックスページをレンダリングするコントローラーアクションを定義する必要があります。 これをコントローラークラスapp.controllers.HomeControllerに配置します。

public Result index(Http.Request request) {
    String url = routes.HomeController.socket()
      .webSocketURL(request);
    return ok(views.html.index.render(url));
}

4.2. テンプレートページ

それでは、 app / views / ndex.scala.html ページに移動して、受信したメッセージのコンテナーと新しいメッセージをキャプチャするためのフォームを追加しましょう。

<div id="messageContent"></div>F
<form>
    <textarea id="messageInput"></textarea>
    <button id="sendButton">Send</button>
</form>

また、 app / views / index.scala.html ページの上部でこのパラメーターを宣言して、WebSocketコントローラーアクションのURLを渡す必要があります。

@(url: String)

4.3. JavaScriptのWebSocketイベントハンドラー

これで、JavaScriptを追加してWebSocketイベントを処理できます。 簡単にするために、 app / views /index.scala.htmlページの下部にJavaScript関数を追加します。

イベントハンドラーを宣言しましょう:

var webSocket;
var messageInput;

function init() {
    initWebSocket();
}

function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

ハンドラー自体を追加しましょう:

function onOpen(evt) {
    writeToScreen("CONNECTED");
}

function onClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

次に、出力を表示するために、関数appendMessageToViewおよびwriteToScreenを使用します。

function appendMessageToView(title, message) {
    $("#messageContent").append("<p>" + title + ": " + message + "</p>");
}

function writeToScreen(message) {
    console.log("New message: ", message);
}

4.4. アプリケーションの実行とテスト

アプリケーションをテストする準備ができたので、実行してみましょう。

cd websockets
sbt run

アプリケーションが実行されている状態で、 http:// localhost:9000 にアクセスして、サーバーとチャットできます。

メッセージを入力してSendを押すたびに、サーバーはJSONプレースホルダーサービスからの loremipsumですぐに応答します。

5. AkkaStreamsを使用してWebSocketを直接処理する

ソースからのイベントのストリームを処理してクライアントに送信する場合、これをAkkaストリームを中心にモデル化できます。

サーバーが2秒ごとにメッセージを送信する例でAkkaストリームを使用する方法を見てみましょう。

HomeControllerのWebSocketアクションから始めます。

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = 
          new MessageDTO("1", "1", "Title", "Test Body");
        Source<JsonNode, ?> out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

Source# tickメソッドは3つのパラメーターを取ります。 1つ目は、最初のティックが処理される前の初期遅延であり、2つ目は、連続するティック間の間隔です。 上記のスニペットでは、両方の値を2秒に設定しています。 3番目のパラメーターは、ティックごとに返されるオブジェクトです。

これが実際に動作することを確認するには、 index アクションのURLを変更し、akkaStreamsSocketエンドポイントを指すようにする必要があります。

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

そして今、ページを更新すると、2秒ごとに新しいエントリが表示されます。

6. アクターの終了

ある時点で、ユーザーリクエストまたはタイムアウトのいずれかによってチャットをシャットダウンする必要があります。

6.1. アクターの終了の処理

WebSocketが閉じられたことをどのように検出しますか?

WebSocketを処理するアクターが終了すると、Playは自動的にWebSocketを閉じます。 したがって、 Actor#postStop メソッドを実装することで、このシナリオを処理できます。

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now()
      .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. アクターを手動で終了する

さらに、アクターを停止する必要がある場合は、PoisonPillをアクターに送信できます。 このサンプルアプリケーションでは、「停止」リクエストを処理できるはずです。

onSendMessageメソッドでこれを行う方法を見てみましょう。

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if("stop".equals(message)) {
        MessageDTO messageDTO = 
          createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

メッセージを受信すると、それが停止リクエストであるかどうかを確認します。 そうである場合は、PoisonPillを送信します。 それ以外の場合は、リクエストを処理します。

7. 構成オプション

WebSocketの処理方法に関して、いくつかのオプションを構成できます。 いくつか見てみましょう。

7.1. WebSocketフレームの長さ

WebSocket通信には、データフレームの交換が含まれます。

WebSocketのフレーム長は構成可能です。 アプリケーションの要件に合わせてフレームの長さを調整するオプションがあります。

短いフレーム長を設定すると、長いデータフレームを使用するサービス拒否攻撃を減らすのに役立つ場合があります。 application.conf で最大長を指定することにより、アプリケーションのフレーム長を変更できます。

play.server.websocket.frame.maxLength = 64k

コマンドラインパラメーターとして最大長を指定することにより、この構成オプションを設定することもできます。

sbt -Dwebsocket.frame.maxLength=64k run

7.2. 接続アイドルタイムアウト

デフォルトでは、WebSocketの処理に使用するアクターは1分後に終了します。 これは、アプリケーションが実行されているPlayサーバーのデフォルトのアイドルタイムアウトが60秒であるためです。これは、60秒以内にリクエストを受信しないすべての接続が自動的に閉じられることを意味します。

これは、構成オプションを使用して変更できます。 application.conf にアクセスして、アイドルタイムアウトが発生しないようにサーバーを変更しましょう。

play.server.http.idleTimeout = "infinite"

または、オプションをコマンドライン引数として渡すこともできます。

sbt -Dhttp.idleTimeout=infinite run

build.sbtdevSettingsを指定してこれを構成することもできます。

build.sbt で指定された構成オプションは、開発でのみ使用され、本番環境では無視されます。

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

アプリケーションを再実行しても、アクターは終了しません。

値を秒に変更できます。

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

利用可能な構成オプションの詳細については、PlayFrameworkのドキュメントを参照してください。

8. 結論

このチュートリアルでは、AkkaアクターとAkkaStreamsを使用してPlayFrameworkにWebSocketを実装しました。

次に、Akkaアクターを直接使用する方法を確認し、次にAkkaStreamsをセットアップしてWebSocket接続を処理する方法を確認しました。

クライアント側では、JavaScriptを使用してWebSocketイベントを処理しました。

最後に、使用できるいくつかの構成オプションについて説明しました。

いつものように、このチュートリアルのソースコードはGitHubから入手できます。