1. 概要

Akkaフレームワークを使用するScalaのアクターは、JVM上で分散プログラムと並行プログラムを作成するのに役立ちます。 これらのアクターは軽量で、200万を超えるアクターが約1GBのヒープを使用します。 これらは、作成、再起動、または停止できるオブジェクトです。

2. 俳優

Akka Typedの導入により、アクターの作成方法にわずかな変更が加えられました。 クラシックアクターでは、アクタートレイトを拡張する必要がありましたが、Akka Typedでは、代わりに、アクターが着信メッセージにどのように応答するかを制御する型付き動作を定義します。

型付きアクターを作成する方法の例を見てみましょう。

object StringActor {
  def apply() : Behavior[String] = Behaviors.receive {
    case (context, message) =>
      println(s"received message $message")
      Behaviors.same
  }
}

Behavior [String] でアクターを定義しました。これは、このアクターがタイプStringのメッセージのみを受信できることを意味します。 ActorContext も利用できます。これは、新しいアクターを生成したり、ActorSystemにアクセスしたりするための従来のアクターに似ています。

また、各メッセージの後の動作を同じままにするように定義しました。

アクターにメッセージを送信することもできます。

val stringActor = ActorSystem(StringActor(),"StringActor")
stringActor ! "Hello World"

作成したString Actor は、Stringのみを受け入れます。 別のタイプのメッセージがあると、コンパイラエラーが発生します。

3. アクターのライフサイクル

アクターは開始、再起動、および停止できます。 これがいつどのように発生するか、そしてプログラムのライフサイクルをどのように活用できるかを理解することが重要です。 たとえば、アクターが開始する前に、データベースへの接続や情報のログ記録を試みることができます。 アクターが停止されようとしていることがわかっているときに、アクターが使用したリソースを解放することもできます。

再起動した場合、アクターを以前の状態に復元することもできます。

3.1. PreStart

アクターが開始する直前に何をするかを制御できます。 従来のアクターでは、 preStart コールバックがありました。これは、アクターの最初のインスタンスの初期化中、つまりActorRefの作成時に直接1回だけ呼び出されます。

ただし、 Akka Typedでは、代わりにのBehavior.setupメソッドを使用して、メッセージを受信する前の初期化時にアクターが実行する必要があることを定義します。

object StringActor {
  def apply() : Behavior[String] = Behaviors.setup { context =>
    println("Before receiving messages")
    Behaviors.receiveMessage[String] {
      message =>
        println(s"received message $message")
        Behaviors.same
    }
  }
}
val stringActor = ActorSystem(StringActor(),"StringActor")
stringActor ! "Hello World"

Behavior.setup を呼び出すことにより、開始前にアクターの動作を制御できます。 各アクターインスタンスは、着信メッセージへの反応を開始する前に、setupメソッドで定義されたコードを実行します。

コードを実行すると、メッセージが処理される前に、Behavior.setupブロックのBehaviors.receiveMessageの直前のコードが呼び出されていることがわかります。

3.2. PostStop

また、アクターが停止しようとしているときにアクターが何をすべきかを制御することもできます。 アクターが使用するリソースを解放するか、データベース接続を閉じることができます。

従来のアクターでは、 postStop フックは、アクターを他のサービスから登録解除するなどの目的で使用されていました。 このフックは、アクターのメッセージキューが無効にされた後に実行されることが保証され、停止したアクターに送信されたメッセージは、ActorSystemdeadLettersにリダイレクトされました。

ただし、Akka Typedでは、代わりに PostStop シグナルを受信するように登録する必要があります。このシグナルは、アクターが停止しようとしているときにトリガーされ、の動作でreceiveSignalメソッドを呼び出します。俳優:

object StringActor {
  def apply() : Behavior[String] = Behaviors.setup { context =>
    println("Before receiving messages")
    Behaviors.receiveMessage[String] {
      case "stop" =>
       Behaviors.stopped
      case message =>
       println(s"received message $message")
       Behaviors.same
    }.receiveSignal {
      case(_, PostStop) =>
        println(s"stopping actor")
        Behaviors.stopped
    }
  }
}
val stringActor = ActorSystem(StringActor(),"StringActor")
stringActor ! "stop"

この例では、「stop」というメッセージを受信したときに停止するようにアクターを構成しました。 このコードを実行すると、 PostStop 部分分数で定義されているものが、アクターが停止したときに呼び出されることがわかります。

この信号を受信するように登録することで、アクターが停止したときに通常行うことを実行できます。

3.3. PreRestart

また、再起動時にアクターを制御することもできます。 アクターを再起動する場合、一般的なことには、ステートフルアクターの場合のイベントの再生や、外部データソースへの再接続が含まれます。 postStopと同様に、これを行うには、PreRestart信号を受信するように登録します。

object StringActor {
  def apply() : Behavior[String] = Behaviors.setup { context =>
    println("Before receiving messages")
    Behaviors.receiveMessage[String] {
      case "stop" =>
        Behaviors.stopped
      case "restart" =>
        throw new IllegalStateException("restart actor")
      case message =>
        println(s"received message $message")
        Behaviors.same
    }.receiveSignal {
      case(_, PostStop) =>
        println(s"stopping actor")
        Behaviors.stopped
      case (_, PreRestart) =>
        println("Restarting Actor")
        Behaviors.stopped
    }
  }
}
val stringBehaviour: Behavior[String] = Behaviors.supervise(StringActor()).onFailure[IllegalStateException](SupervisorStrategy.restart)
val stringActor = ActorSystem(stringBehaviour,"StringActor")
stringActor ! "restart" 

「再起動」メッセージで例外をスローするようにアクターを構成しました。 これで、クラシックアクターは例外が発生するたびに自動的に再起動されますが、Akka Typedの場合、これらの例外が発生するとアクターは停止します。 さらに、アクターを再起動できるようにするには、動作にスーパーバイザー戦略を定義して、アクターに再起動させたい例外または障害の種類を定義する必要があります。

val stringBehaviour: Behavior[String] = Behaviors.supervise(StringActor()).onFailure[IllegalStateException](SupervisorStrategy.restart)
val stringActor = ActorSystem(stringBehaviour,"StringActor")
stringActor ! "Hello World"
stringActor ! "restart"

スーパーバイザー戦略を使用してアクターの動作を定義することにより、アクターの機能に障害戦略を追加し、指定された障害が発生した場合に、この場合は再起動することを決定します。

特定のエラーが発生した場合、アクターを再起動させたくはありませんが、停止させたいと考えています。 この場合、SupervisorStrategy.stopを簡単に使用できます。

4. 結論

この記事では、アクターのライフサイクルと、これらのライフサイクルでアクターを制御する方法について説明しました。 アクターを使用する場合、これらのライフサイクルを理解すると、効率的で安全なコードを作成するのに役立ちます。これは、並行および分散プログラムを構築するときに使用できる強力なツールです。

いつものように、ソースコードはGitHubにあります。