1. 概要

すべてのリアクティブシステムのガイドラインであるリアクティブマニフェストは、分散アプリケーションは復元力が必要であると述べています。 回復力を実現するには、障害をアーキテクチャの第一級市民として扱うシステムを開発する必要があります。

Akkaライブラリは、監視のメカニズムを通じて、復元力のあるシステムを実装するためのプリミティブを提供します。 このチュートリアルでは、分散システムの復元力を向上させるためにAkkaと監視を使用する方法を示します。

2. Akkaの依存関係

いつものように、Akka Typedライブラリを使用するには、 akka-actor-typed をインポートする必要があり、テストにはakka-actor-testkit-typedが必要です。

libraryDependencies += "com.typesafe.akka" % "akka-actor-typed_2.12" % "2.6.9",
libraryDependencies += "com.typesafe.akka" % "akka-actor-testkit-typed_2.12" % "2.6.9" % Test

3. シナリオ

まず、Akkaでの監督について話すには、実用的な例が必要です。 したがって、ファイルを提供するWebサーバーを模倣する単純なサービスを実装します。

trait Resource
case class File(id: String, content: Array[Byte], mimeType: String) extends Resource

WebサーバーはGetリクエストのみを処理し、Webサーバーが提供する応答には常に元のパスが含まれます。

case class Get(path: String, replyTo: ActorRef[Response]) extends Request

case class Ok(path: String, resource: Resource) extends Response
case class NotFound(path: String) extends Response
case class BadRequest(path: String) extends Response
case class InternalServerError(path: String, error: String) extends Response

では、Webサーバーは到着したリクエストをどのように処理しますか?

Filesystem にリソースの検索を要求するだけで、ファイルを呼び出し元に返します。

object WebServer {
  def apply(filesystem: ActorRef[Find]): Behavior[Request] =
    Behaviors.receive { (context, message) =>
      message match {
        case Get(path, replyTo) =>
          findInTheFilesystem(filesystem, context, replyTo, path)
        // More to come ;)
      }
      Behaviors.same
    }

今のところ、多くの詳細を省略しています。 ただし、この記事の残りの部分では空白を埋めます。

4. 処理するかしないか

前述のように、分散システムでは、障害を念頭に置いてコンポーネントを開発する必要があります。 しかし、コードでどのような種類のエラーを処理する必要がありますか? 基本的に、検証エラーと失敗の2種類があります。

検証エラーは、入力のエラーやビジネスロジックが管理できるものなど、一般的なエラーを表します。

たとえば、Webサーバーが有効なURIを path として予期しており、着信メッセージにURIではないものが含まれているとします。 この場合、WebサーバーはBadRequestメッセージで発信者に応答します。

Behaviors.receive { (context, message) =>
  message match {
    case Get(path, replyTo) =>
      if (isNotValid(path)) {
        replyTo ! BadRequest(path)
      }

したがって、アクターのドメインでは、検証エラーはアクターとの対話に使用されるプロトコルの一部である必要があります。

一方、障害とは、使用できなくなった外部リソースへの接続や、ファイルシステムがいっぱいになるなど、予期しないエラー状態です。 明らかに、ビジネスロジックと、起こりうるすべての予期しない障害の処理を混在させることはできません。 結果のコードはすぐに保守が困難になります。

したがって、障害を処理する唯一の可能な方法は、障害を処理する責任を、スーパーバイザーと呼ばれる外部ソースに委任することです。

5. 監督:予期しないことを期待する

例を改善する時が来ました。 Webサーバーは、ファイルシステム内のリソースを検索することにより、着信要求を処理します。 ファイルシステムを、そのプロトコルとともに専用のアクターとしてモデル化できます。

object Filesystem {
  def apply(): Behavior[FsFind] = search

  private def search: Behavior[FsFind] = {
    Behaviors.receive { (context, message) =>
      context.log.info(s"Received a request for path ${message.path}")
      message.replyTo !
      if (Random.nextBoolean)
        FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
      else
        FsMiss
      Behaviors.same
    }
  }
}

前述したように、予期しない障害を処理するコードでビジネスロジックをインターリーブすることはベストプラクティスではありません。 したがって、 Akkaは、俳優の行動の周りの装飾を使用して監視を実装します

object Filesystem {
  def apply(): Behavior[Find] = {
    Behaviors
      .supervise[Find](search)
      .onFailure[Exception](SupervisorStrategy.restart)
  }
 
  // ...

ご覧のとおり、監視では、例外と、障害が発生した場合に使用する戦略の両方を定義します。

この例では、監視は Exception クラスのすべてのサブタイプをキャッチし、アクターを再起動します。 さまざまなタイプの例外に関連付けられたさまざまな監視戦略を指定することができます。 これを行うには、監視メソッドへの複数の呼び出しをネストするだけです。

Behaviors
  .supervise[Find](
    Behaviors
      .supervise(search)
      .onFailure[IOException(SupervisorStrategy.resume))
  .onFailure[Exception](SupervisorStrategy.restart)

発生した障害のタイプに応じて、スーパーバイザーには3つの選択肢があります。

  • 失敗したアクターを再開する
  • アクターを再起動します
  • 俳優を止める

Akkaが上記の各状況にどのように対処するかを見てみましょう。

5.1. 世界を救おうとしています:俳優を再開します

失敗後の最も幸運なケースはエラーがアクターの状態を破壊しなかったとき。 したがって、俳優はその仕事を続けることができます 、次のメッセージを処理します。 たとえば、キャッシュを実装しましょう。常にファイルシステムから情報を読み取る必要はなく、キャッシュの状態をモデル化する必要があります。 簡単にするために、 Map [String、Resource]を使用します。

def cache(filesystem: ActorRef[FsFind],
          cacheMap: Map[String, Resource]): Behavior[Request] =
  Behaviors.receive { (ctx, message) =>
    message match {
      case Find(path, replyTo) =>
        val maybeAnHit = cacheMap.get(path)
        maybeAnHit match {
          case Some(r) => replyTo ! Hit(r)
          case None => askFilesystemForResource(filesystem, ctx, path, replyTo)
        }
        Behaviors.same
      case AdaptedFsFound(path, res, replyTo) =>
        replyTo ! Hit(res)
        cache(filesystem, cacheMap + (path -> res))
      case AdaptedFsMiss(_, replyTo) =>
        replyTo ! Miss
        Behaviors.same
    }
  }

監視戦略SupervisionStrategy.resumeを使用して、障害が発生した後もアクターがメールボックスの読み取りを続行できるようにします。

object Cache {
  def apply(filesystem: ActorRef[FsFind]):
    Behaviors
      .supervise(cache(filesystem, Map[String, Resource]()))
      .onFailure(SupervisorStrategy.resume)
 
  // ...
}

上記の例では、アクターを定義するために機能スタイルを使用しています。 動作を定義する関数を介して情報を渡すことにより、可変状態をエミュレートします。 幸い、アクターの動作を定義するメソッドの最初の呼び出しで監視を定義するだけで十分です。

Akkaライブラリは、cacheメソッドの各呼び出しをsupervise関数で装飾します。

5.2. アクターを再起動するだけです

障害によってアクターの状態が破損した場合は、障害状態が移行状態によるものであることを期待して、再起動する以外に何もする必要はありません

したがって、 SupervisorStrategy.restart を使用して、障害の後にアクターを再起動する必要があることを指定できます。

Behaviors.supervise[Find](search).onFailure[Exception](SupervisorStrategy.restart)

より適切な段階的再起動ポリシーを指定して、監視が特定の時間間隔でアクターを再起動できる最大回数を指定できます。

SupervisorStrategy.restart.withLimit(maxNrOfRetries = 10, withinTimeRange = 5 minutes)

再起動後、アクターのすべての子が停止します。 明らかに、この動作により、親が再起動されるたびに新しい子アクターを作成するリソースリークが防止されます。 ただし、このデフォルトの動作をオーバーライドすることは可能です。

まず、子アクターの作成を監視の外に移動する必要があります。これは、新しいメッセージの受信を処理するためのコードのみを装飾する必要があります。 最後になりましたが、 withStopChildren メソッドを呼び出して、子アクターを停止せず、代わりにアクター間のぶら下がっている参照を管理するようにAkkaに指示します。

def apply(): Behavior[Request] = {
  Behaviors.setup { context =>
    val filesystem = context.spawn(Filesystem(), "filesystem")
    val cache = context.spawn(Cache(filesystem), "cache")
    Behaviors.supervise {
      serve(context, cache)
    }.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
  }
}

対照的に、子アクターが再起動された場合、Akkaは子アクターの参照を親に置き換えます。

すでに述べたように、障害は予測できないイベントです。 損失が発生した場合、アクターはエラーの原因となったリソース以外のリソースを開いている可能性があります。 Akkaは、これらの保留中のリソースをすべて最終的にクリーンアップして閉じるメカニズムを提供します。

実際、再起動の準備ができているすべてのアクターは、聞くことができるPreRestart信号を発します。

receiveSignal は、 Behavior タイプのメソッドであり、アクターの動作の定義の直後に呼び出すことができます。 この例では、 FileSystem アクターが再起動される直前に、アクターが所有するリソースを解放する必要があります。

Behaviors.receive[FsFind] { (context, message) =>
  context.log.info(s"Received a request for path ${message.path}")
  message.replyTo !
  if (Random.nextBoolean)
    FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
  else
    FsMiss
  Behaviors.same
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

5.3. 言うことは何も残っていません:俳優を止めてください

アクターの停止がデフォルトの監視戦略です。 アクターの実行に必要な条件が満たされない場合、アプリケーションの状態が危険にさらされ、アクターを再開または再起動する価値がなくなる可能性があります。 たとえば、スケジュールされたアクションなどについて考えてみます。

アクターシステムがアクターを停止すると、停止しているアクターに PostStop シグナルを送信して、処理されたリソースをクリーンアップします。

実際、前の例で見たPreRestart信号と一緒にPostStop信号を処理することは非常に一般的です。

Behaviors.receive[Find] { (context, message) =>
  // ...
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart || signal ==  PostStop =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

前に述べたように、Akkaはアクターを階層に配置します。 子アクターが停止した場合、親はどうなりますか?親アクターのデフォルトの動作は、それ自体が失敗し、階層の上位で失敗をバブリングすることです。 しかし、子役の1人が停止した場合に備えて、親が生き続けたい場合はどうでしょうか。 まず、親アクターはその子を監視する必要があります。

val webServer = context.spawn(WebServer(), message.id)
context.watch(webServer)

アクターを監視するということは、アクターが停止したときにChildFailedシグナルを受信することを意味します。 ChildFailed は、より一般的な信号 Terminated を拡張します:

object Main {
  case class Start(id: String)
  def apply: Behavior[Start] = {
    Behaviors.receive[Start] { (context, message) =>
      val webServer = context.spawn(WebServer(), message.id)
      context.watch(webServer)
      Behaviors.same
    }.receiveSignal {
      case (ctx, ChildFailed(ref, cause)) =>
        ctx.log.error(s"Child actor ${ref.path} failed with error ${cause.getMessage}")
        Behaviors.same
    }
  }
}

ただし、親アクターが ChildFailed シグナルを処理しないことを決定した場合、親アクターはそれ自体を終了し、 DeathPactException を発生させ、それによって失敗をバブリングします。

6. メッセージはどうなりますか?

これまで、例外が発生した場合にアクターがどうなるかについて説明してきました。 しかし、失敗を生成するメッセージはどうなりますか? メッセージは失われ、アクターシステムはメッセージを破棄します。 いかなる状況でも、メッセージはメールボックスに戻されません。

逆に、メールボックスには何も起こりません。 アクターを再開または再起動する必要があると監視戦略で定義されている場合、アクターは残りのすべてのメッセージを含む同じメールボックスを処理し続けます。

7. 結論

この記事では、検証エラーと失敗の違いから始めました。 監視と利用可能な監視戦略を使用して、Akkaライブラリが両方をどのように処理するかを調べました。

いつものように、コードはGitHubから入手できます。