1. 概要

Tell Pattern を導入した後、実際のシナリオでより頻繁に発生するいくつかの対話、つまり要求/応答パターンの概要を説明します。

2. Akkaの依存関係

いつものように、Akka Typedライブラリを使用するには、 akka-actor-typed をインポートする必要があり、テストにはakka-actor-testkit-typedが必要です。

libraryDependencies += "com.typesafe.akka" % "akka-actor-typed_2.12" % "2.6.8",
libraryDependencies += "com.typesafe.akka" % "akka-actor-testkit-typed_2.12" % "2.6.8" % Test

3. シナリオ

要求と応答のパターンについて話すには、まず、ニーズに合った例が必要です。 それでは、処理するメッセージに応答するアクターを定義しましょう。

着信文字列をBase64文字列にエンコードするサービスが必要だと想像してみてください。 次に、そのようなサービスのクライアントはプレーンな文字列を送信し、サービスがBase64表現を返すことを期待できます。 まず、通信プロトコルを定義します。

sealed trait Request
final case class ToEncode(payload: String, replyTo: ActorRef[Encoded]) extends Request
sealed trait Response
final case class Encoded(payload: String) extends Response

エンコード要求はすべて、エンコードする文字列を含むToEncodeメッセージを使用します。 その後、サービスはEncodedメッセージのインスタンスを使用してエンコードされた文字列を返します。

4. 要求/応答:簡単な方法

アクターに要求/応答パターンを実装させる場合は、ActorRefが必要です。 ActorRef [-T]は、タイプTのメッセージを処理できるアクターを指します。呼び出されたアクターは、要求された情報で応答するために、呼び出し元のActorRefを使用する必要があります。

この例では、 To Encodeメッセージ内のアクター間でActorRef[Encoded]を渡します。 実際、タイプActorRef[Encoded]の属性replyToがあります。

したがって、この情報を使用して、Base64Encoderアクターを実装しましょう。

object Base64Encoder {
  def apply(): Behavior[Encode] =
    Behaviors.receiveMessage {
      case ToEncode(payload, replyTo) =>
        val encodedPayload = Base64.getEncoder.encode(payload.getBytes(StandardCharsets.UTF_8))
        replyTo ! Encoded(encodedPayload.toString)
        Behaviors.same
  }
}

この例では、 Base64Encoder アクターはエンコードの要求を受け取ります。 タイプToのメッセージエンコード エンコードするテキストと呼び出し元のアクターへの参照の両方が含まれます。 ペイロードが正常にエンコードされると、アクターはtellオペレーターを使用して呼び出し元に応答できます。

replyTo ! ToEncoded(encodedPayload.toString)

呼び出し元アクターの1つの可能な実装を見てみましょう。

object NaiveEncoderClient {
  def apply(encoder: ActorRef[Request]): Behavior[Encoded] =
    Behaviors.setup { context =>
      encoder ! ToEncode("The answer is 42", context.self)
      Behaviors.receiveMessage {
        case Encoded(payload) => context.log.info(s"The encoded payload is $payload")
          Behaviors.empty
      }
    }
}

ご覧のとおり、 NaiveEncoderClient は、 context を使用して、それ自体のActorRefを取得します。

context.self

メッセージを送信した後、アクターはエンコードされたメッセージを含む応答の待機を開始します。

上記の相互作用は、応答を伴う最も簡単なものです。 ただし、実際のシナリオで使用できる可能性はほとんどありません。 実際、呼び出し元アクター NaiveEncoderClient は、アクター Base64Encoder –によって定義された応答プロトコルを使用する必要があります。さまざまな種類のメッセージを聞かないでください。

NaiveEncoderClient が別のタイプのメッセージを聞きたい場合はどうなりますか? 多くの場合、呼び出し元のアクターには独自のプロトコルがあります。 この問題をどのように解決できますか? 適応した応答パターンを紹介しましょう。

5. 要求-応答:適応された応答パターン

呼び出し元のアクターに独自のプロトコルを持たせるたびに、適合した応答パターンを使用する必要があります。新しい通信プロトコルを定義しましょう。

sealed trait Command
final case class KeepASecret(secret: String) extends Command

メッセージKeepASecretは、アクターに秘密の文字列を保持するように要求します。 アクターのEncoderClientは、秘密をBase64形式で保持したいと考えています。 したがって、アクターがタイプKeepASecretEncodedの両方のメッセージをリッスンできるようにする戦略が必要です。

幸い、 Akka Typedライブラリは、メッセージアダプタを定義するメカニズムを提供します。これは、Encodedメッセージを特性Commandに適合させることを意味します。

まず、プライベートメッセージアダプタを定義します。

private final case class WrappedEncoderResponse(response: Encoded) extends Command

したがって、アクターはエンコードされたメッセージを受信すると、それを WrappedEncoderResponse。 結果として得られる構造は、実際には メッセージマッパー のメッセージを処理します エンコードされた タイプ:

val encoderResponseMapper: ActorRef[Encoded] =
  context.messageAdapter(response => WrappedEncoderResponse(response))

最後に、マッパーへの参照をEncodedメッセージと一緒に渡す必要があります。 マッパーは応答をアクターに転送し、マッパー宣言中に設定された関数を使用して元のメッセージをラップします。

object EncoderClient {
  def apply(encoder: ActorRef[Request]): Behavior[Command] =
    Behaviors.setup { context =>
      val encoderResponseMapper: ActorRef[Encoded] =
        context.messageAdapter(response => WrappedEncoderResponse(response))
      Behaviors.receiveMessage {
        case KeepASecret(secret) =>
          encoder ! ToEncode(secret, encoderResponseMapper)
          Behaviors.same
        case WrappedEncoderResponse(response) =>
          context.log.info(s"I will keep a secret for you: ${response.payload}")
          Behaviors.same
      }
    }
}

さまざまなメッセージアダプタを登録できますが、着信メッセージの種類ごとに1つのメッセージアダプタしか設定できません。これは、新しく登録したアダプタが同じメッセージクラスの既存のアダプタを置き換えることも意味します。

適応された応答パターンを使用すると、他の多くのプロトコルを使用してさまざまなアクターと対話できます。 ただし、まだいくつかの問題が残っています。 応答をその要求に関連付けるにはどうすればよいですか? 自家製の相関IDを実装したくない場合は、別の戦略を使用する必要があります。 どれを見てみましょう。

6. 要求/応答:質問パターン

Askパターンを使用すると、リクエストを1つのレスポンスに正確に関連付ける必要があるインタラクションを実装できます。したがって、レスポンスをリクエストに関連付けることができるため、より単純な適応レスポンスパターンとは異なります。

6.1. APIゲートウェイシナリオ

パターンをよりよく理解するために、この例をより難しくしましょう。 複雑なシステムのAPIをクライアントから隠すサービスであるAPIGatewayを実装します。 APIゲートウェイは、マイクロサービスアーキテクチャなどのシステムと対話するための、よりアクセスしやすく、より一貫性のある方法を提供します。 この例では、エンコーダサービスのゲートウェイを開発します。

まず、新しいプロトコルを定義する必要があります APIGateway 俳優:

sealed trait Command
final case class PleaseEncode(payload: String, replyTo: ActorRef[GentlyEncoded]) extends Command
final case class GentlyEncoded(encodedPayload: String)

ご覧のとおり、プロトコルは要求と応答を定義し、前者では、呼び出し元は、エンコードされたペイロードの転送を容易にするためにそれ自体への参照を提供します。 したがって、要求と応答が完全に結び付けられていることが重要です。

適応した応答パターンに対して行ったように、適応する必要があります Base64Encoder 俳優のプロトコルからのプロトコルへ APIGateway 俳優。 それでは、アダプターを定義しましょう。 今回は、通信中にエラーが発生した場合に使用するアダプターも定義します(理由はすぐにわかります)。

private case class AdaptedResponse(payload: String, replyTo: ActorRef[GentlyEncoded]) extends Command
private case class AdaptedErrorResponse(error: String) extends Command

お気づきかもしれませんが、 AdaptedResponse メッセージには、APIGatewayの外部呼び出し元への参照が含まれています。 そのような参照は、内部のゲートウェイに到達します エンコードしてください メッセージ。 それらを結び付ける方法を見てみましょう。

6.2. 質問パターンの使用方法

contextオブジェクトによって提供されるaskメソッドを使用します。

implicit val timeout: Timeout = 5.seconds
Behaviors.receiveMessage {
  case PleaseEncode(payload, replyTo) =>
    context.ask(encoder, ref => ToEncode(payload, ref)) {
      case Success(Encoded(encodedPayload)) => AdaptedResponse(encodedPayload, replyTo)
      case Failure(exception) => AdaptedErrorResponse(exception.getMessage)
    }
    Behaviors.same

The 聞く メソッドは2つのパラメーターを取ります。 最初は ActorRef リクエストを送信します。 2つ目は、呼び出し元のアクター自体への参照を指定して、送信するメッセージを生成する関数です。 さらに、 聞く メソッドは Future[エンコード]。 Futuresは、将来利用できる値を表すデータ構造です。 Future は正常に完了するか、発生した例外を含む障害が発生する可能性があります。

したがって、の使用未来 sを使用すると、特定の要求をその応答にバインドできます。 未来 メッセージへの送信は、リクエストと同じコンテキストで実行されます。 だから、安全に使用することが可能です に返信 に保存されている情報 エンコードしてください メッセージ。

の解決を永遠に待つのは正しい選択ではありません 未来。 したがって、次のように定義します。 タイムアウト 俳優の行動の文脈で。

最後に、定義する必要があります APIGateway タイプのメッセージに応じたアクターの動作 AdaptedResponse と AdaptedErrorResponse:

Behaviors.receiveMessage {
  // ...
  case AdaptedResponse(encoded, ref) =>
    ref ! GentlyEncoded(encoded)
    Behaviors.same
  case AdaptedErrorResponse(error) =>
    context.log.error(s"There was an error during encoding: $error")
    Behaviors.same
}

7. 結論

この記事では、Akkaタイプのアクター間の要求と応答の相互作用を開発するために、利用可能なパターンのいくつかを確認しました。 これは完全な話ではなく、さらに複雑な種類の相互作用を実装するより高度なパターンがあることに注意することが重要です。 セッションごとの子役 または 汎用応答アグリゲーター.

いつものように、このチュートリアルで使用される完全なコード例が利用可能です GitHubで.