1. 概要

アクターモデルは、非常に有望な並行プログラミングモデルです。 最も成功している実装の1つは、JVMのリファレンス実装であるAkkaです。

このチュートリアルでは、アクターモデルの主な機能と、Akkaが最後のバージョンである AkkaTypedライブラリでそれらを実装する方法について説明します。

2. シナリオ

アクターモデルをよりよく理解するために、この記事全体で具体的な例を使用します。 株式を売買できる株式ポートフォリオを定義します。

Stock は、株式の所有数量を表します。

case class Stock(name: String, owned: Long) {
  def buy(qty: Long): Stock = copy(name, owned + qty)
  def sell(qty: Long): Stock =
    if (qty <= owned)
      copy(name, owned - qty)
    else
      this
}

ポートフォリオクラスは、株式のマップのラッパーです。

case class Portfolio(stocks: Map[String, Stock]) {
  def buy(name: String, qty: Long): Portfolio = {
    // Code that adds stocks to the portfolio
  }
  def sell(name: String, qty: Long): Portfolio = {
    // Code that sells stocks from the portfolio
  }
}

ポートフォリオのクライアントは、たとえば、ソーシャルネットワークからの情報を分析し、所有する株式に対して実行する操作を決定するボットである可能性があります。

3. アプリケーションのセットアップ

まず、必要なプロジェクトの依存関係を設定します。 ここではSBTを使用します。 Akka Typedライブラリを使用するには、akka-actor-typedアーティファクトから依存関係をインポートする必要があります。

libraryDependencies += "com.typesafe.akka" % "akka-actor-typed_2.12" % "2.6.6"

一方、Akka Typedアクターをテストするには、akka-actor-testkit-typedアーティファクトから依存関係をインポートする必要があります。

libraryDependencies += "com.typesafe.akka" % "akka-actor-testkit-typed_2.12" % "2.6.6" % Test

4. この物語の主人公:俳優

アクターモデルは、アクターの概念に焦点を当てています。これは、組み込みのメッセージングメカニズムを使用して他のアクターと対話するシステム内の個別の計算単位を表します。

アクターはメッセージに応答して計算を実行するため、リアクティブオブジェクトの形式として説明できます

アクターは、他のアクターとは別のユニットを表します。 それらは互いに状態を共有しません。 アクターがシステム内の他のアクターと通信する唯一の方法は、それらにいくつかのメッセージを送信し、最終的に応答を待つことです。

アクターシステム内では、アクターは次のアクションのみを実行できます。

  • 自分自身または他のアクターに通信を送信する
  • 新しいアクターを作成する
  • 置換動作を指定する

ご覧のとおり、アクターモデルは非常に簡単です。

ライブラリAkkaTypedは、アクターをその動作のファクトリとして定義しています

アクターを定義するには、2つの異なるアプローチがあります。 オブジェクト指向アプローチまたは機能アプローチを使用できます。 この記事では、後者を使用します。

まず、顧客に関連する株式ポートフォリオを管理する銀行が必要です。

object Bank {
  final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
  final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])

  def apply(): Behavior[CreatePortfolio] =
    Behaviors.receive { (context, message) =>
      val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
      message.client ! PortfolioCreated(replyTo)
      Behaviors.same
    }
}

オブジェクトBankは、最初に入力されたアクターを表します。

apply メソッドを使用すると、アクターに関連付けられたビジネスロジックを作成できます。これは、アクターモデルが動作と呼びます。

アクターの動作は、アクターが処理して反応できる一連のメッセージを定義します。 この場合、 Bank アクターは、タイプ CreatePortfolioのメッセージをリッスンします。このようなメッセージにより、アクターは新しいポートフォリオを作成し、それらを要求したクライアントに返すことができます。

ビヘイビアタイプは、ビヘイビアを作成するためのファクトリです。

すべてのアクターは、他のアクターから同時に実行されます。 ただし、誰もその内部状態にアクセスできません。 状態の競合のため、アクターの実行中に競合状態が発生することはありません。

すべてのアクターにはメールボックスが関連付けられています。 メールボックスは、メッセージが処理される前にキューに入れられる場所です。 すべてのアクターに関連付けられているデフォルトのメールボックスには制限がないことに注意してください。 Akkaメールボックスの詳細については、Akkaのドキュメントを参照してください。

5. アクターのコミュニケーション方法:メッセージ

メッセージは、アクターにデータに対してビジネスロジックを実行するように依頼する唯一の方法です。 すべてのアクターは、そのプロトコルを定義します。これは、に反応できる一連のメッセージです。

メッセージが可変状態である必要はないため、不変クラスとして実装するのが最善です。アクターのプロトコルが複数のメッセージにまたがる場合は、ベースを提供することをお勧めします封印された特性

Bank アクターのプロトコルは、2つのメッセージ、1つの要求、および1つの応答で構成されています。

final case class CreatePortfolio(client: ActorRef[PortfolioCreated])
final case class PortfolioCreated(portfolio: ActorRef[PortfolioCommand])

アクターが処理できるメッセージのタイプは、そのBehaviorの定義によって強制されます。

def apply(): Behavior[CreatePortfolio] = 
  Behaviors.receive { (context, message) =>

アクターと対話するには、アクターへの参照が必要です。 ライブラリがアクターの参照を共有するために使用するタイプは、 ActorRef[-T]です。 型変数Tは、アクターが処理できるメッセージを定義します。

2つのアクター間の要求/応答通信モデルを実装する場合は、メッセージ内に、応答するアクターの参照を提供する必要があります。

この例では、 Bank は、新しいポートフォリオの作成を要求するクライアントに応答する必要があります。 実際、 CreatePortfolio メッセージには、呼び出し元への参照が含まれています。

すべてのアクターは、context.selfオブジェクトを介して独自のアクター参照にアクセスできます。 たとえば、銀行のクライアントは、新しいポートフォリオインスタンスの作成を要求するメッセージ内で自分自身を送信します。

Behaviors.receive { (context, message) =>
  bank ! CreatePortfolio(context.self) 
  // More behavior logic
}

アクター間には多くの可能な相互作用モデルがあります。 よりよく知られている2つは、tellパターンとaskパターンです。

5.1. テルパターン

俳優に話し、何かを聞かないでくださいとよく言われます。 この理由は、tellパターンが2人のアクターが通信するための完全に非同期の方法を表すためです。 アクター参照を取得したら、「!」演算子を使用してメッセージを非同期に送信できます。

この例では、 Bank は、新しいポートフォリオを作成すると、それを要求したクライアントに返します。

val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
message.client ! PortfolioCreated(replyTo)

テルパターンは完全に非同期です。 メッセージが送信された後、メッセージが受信されたかどうか、またはプロセスが成功したか失敗したかを知ることはできません。

5.2. 質問パターン

質問パターンは、要求/応答通信モデルを使用するためのより自然な方法を実装します。 リクエストとレスポンスの間に1:1の関係がある場合に処理できます。

コールバックを使用して非同期性を処理するプログラミングスタイルを実装します。 銀行のクライアントは、askパターンを使用して実装できます。

object BankClient {
  def apply(bank: ActorRef[CreatePortfolio]): Behavior[Unit] =
    Behaviors.setup { context =>
      implicit val timeout: Timeout = 3.seconds
      context.ask(bank, CreatePortfolio) {
        case Success(message) =>
          context.log.info("Portfolio received")
          message.portfolio ! Buy("APPL", 100L)
        case Failure(_) => context.log.info("Portfolio received")
      }
      Behaviors.ignore[Unit]
    }
}

質問パターンを使用するには、いくつかの手順が必要です。 まず、タイムアウトが必要です。 応答を待っているので、永遠に待つことはできません。 それ以外の場合は、アクターの実行をブロックします。

メッセージを送信するには、 context オブジェクトを使用して、askメソッドを呼び出します。

ask の最初のパラメーターは、メッセージを送信するアクターへのActorRefです。

2番目のパラメーターは、タイプ ActorRef =>Messageの関数です。 ケースクラスCreatePortfolioにはapply関数があるので、それを使用します。 これは、関数 ref => CreatePortfolio(ref)を渡すのと同じです。 ref 関数パラメーターは、応答を処理するアクターを表します。

最後に、リクエストの成功または失敗を処理する部分関数を提供します。 この関数の戻りタイプは、クライアントが処理できるBehaviorのタイプと同じである必要があります。

6. メッセージを広める:俳優の作成

上記の多くの例で見たように、 Akka Typedは、メソッドファクトリを使用してアクターのインスタンスを作成します。 これらのファクトリは、Behaviorsタイプにリストされています。 すべてのファクトリは、 Behavior [-T]、タイプのオブジェクトを返します。これは、アクターが処理するメッセージの種類を正確に定義します。

最も一般的なファクトリメソッドはBehaviors.receive:です。

Behaviors.receive { (context, message) =>
  val replyTo = context.spawn(PortfolioActor(), UUID.randomUUID().toString)
  message.client ! PortfolioCreated(replyTo)
  Behaviors.same
}

このメソッドは、アクターがコンテキストおよび受信したメッセージにアクセスできるようにする関数を入力として受け取ります。 context オブジェクトを使用すると、アクターはアクターシステムユーティリティにアクセスできます。

関数はBehaviorを返す必要があります。これは、前述したように、アクターはメッセージに関連付けられたビジネスロジックの実行後に変更できるためです。 実際の動作を変更する必要がない場合は、ファクトリメソッドBehaviors.sameを使用できます。

context オブジェクトが必要ない場合は、Behaviors.receiveMessageメソッドを使用できます。 たとえば、 Portfolio アクターによるメッセージの処理には、contextへのアクセスは必要ありません。

Behaviors.receiveMessage { message =>
  message match {
    case Buy(stock, qty) =>
      portfolio(stocks.buy(stock, qty))
    case Sell(stock, qty) =>
      portfolio(stocks.sell(stock, qty))
  }
}

場合によっては、外部メッセージに反応するだけでなく、起動時にいくつかのアクションを実行するアクターが必要になります。 ファクトリメソッドBehaviors.setupは、このシナリオを実装します。

たとえば、新しいクライアントがアプリケーションを開始するたびに、銀行に新しいポートフォリオを作成するように指示する必要があります。

object BankClientUsingTheTellPattern {
  def apply(bank: ActorRef[CreatePortfolio]): Behavior[PortfolioCreated] =
    Behaviors.setup { context =>
      bank ! CreatePortfolio(context.self)

      Behaviors.receiveMessage {
        case PortfolioCreated(portfolio) =>
          // Do something with the new portfolio
      }
    }
}

Behaviors.setup ファクトリメソッドを使用すると、クライアントは、外部からのメッセージに反応することなく、銀行に適切なメッセージを送信できます。

7. 俳優の行動とそれを変える方法

なぜライブラリはAkkaTypedと呼ばれるのですか? Behavior [-T] ジェネリック型は、アクターが T type変数を使用して受信できるメッセージを定義するため、コンパイラーはメッセージ送信の正確さをチェックする必要があります。

主な例では、アプリケーションで作成されたポートフォリオごとにアクターを定義します。 アクターが使用するプロトコルは、BuySellの2つのメッセージで構成されています。 各メッセージは、特性PortfolioCommandを拡張します。

sealed trait PortfolioCommand 
final case class Buy(stock: String, quantity: Long) extends PortfolioCommand 
final case class Sell(stock: String, quantity: Long) extends PortfolioCommand

Behavior [PortfolioCommand] を使用してアクターを定義すると、コンパイラーは、アクターが基本特性を拡張する適切なメッセージを処理するかどうかを確認します。

Behaviors.receiveMessage { 
  case Buy(stock, qty) => 
    // Buy some stocks
  case Sell(stock, qty) => 
    // Sell some stocks
}

アクターは、新しいメッセージの到着に反応し、いくつかのビジネスロジックを実行し、次のメッセージに応答するために使用する動作を選択します。 Behaviors typeに含まれているファクトリメソッドを使用すると、各メッセージの処理後にアクターの動作を変更することができます。

また、アクターは状態を共有しないとも言いました。 ただし、アクターの実行中に何らかの状態を維持する必要がある可能性が非常に高くなります。 行動の変化と俳優の状態の変化をどのように関連付けることができますか?

質問への答えは簡単です。状態は、変数を使用するのではなく、動作を変更することによって管理されます。 主な例では、アプリケーションで作成されたポートフォリオごとにアクターを定義します。

object PortfolioActor {
  
  def apply(): Behavior[PortfolioCommand] = {
    portfolio(Portfolio(Map.empty))
  }

  private def portfolio(stocks: Portfolio): Behavior[PortfolioCommand] = {
    Behaviors.receiveMessage {
      case Buy(stock, qty) =>
        portfolio(stocks.buy(stock, qty))
      case Sell(stock, qty) =>
        portfolio(stocks.sell(stock, qty))
    }
  }
}

PortfolioActorは、Portfolioオブジェクトへの参照を維持する必要があります。 このオブジェクトでは、アクターはビジネスロジックを実装するメソッドを呼び出して、いくつかの株を売買します。

ローカル変数を変更する代わりに、現在の Portfolio インスタンスを動作を定義するメソッドに渡します。このアプローチでは、変更可能な変数の使用が完全に排除されます。

純粋で不変のタイプの俳優! それは素晴らしいことです!

8. アクターシステム:すべてが始まる場所

アクターがメッセージを実行および送信できるようにする構造は、ActorSystemと呼ばれます。 これは、アクターの実行に必要なスレッドを割り当てるヘビーウェイトオブジェクトです。 通常、JVMプロセスごとに1つのActorSystemがあります。

アクターは階層を形成します。 アクターが別のアクターをスポーンするとき、それらはそれぞれ親と子であると言われます。 子役のライフサイクルは、その親に関連付けられています。 アクターは自分自身を停止することも、親によって停止することもできます

BankMain アクターは、BankBankClientUsingTheTellPatternを作成し、それらの親になります。

object BankMain {
  final case class Start(clientName: String)

  def apply(): Behavior[Start] =
    Behaviors.setup { context =>
      context.log.info("Creation of the bank")
      val bank = context.spawn(Bank(), "bank")
      Behaviors.receiveMessage { message =>
        context.log.info("Start a new client")
        context.spawn(BankClientUsingTheTellPattern(bank), message.clientName)
        Behaviors.same
      }
    }
}

階層の最上位には常にルートアクターがあり、ガーディアンアクターとも呼ばれます。 アクターシステムによって最初に開始されるのはアクターです。

この例では、BankMainが保護者のアクターです。

def main(args: Array[String]): Unit = {
  val system: ActorSystem[BankMain.Start] = ActorSystem(BankMain(), "main")
  system ! Start("Alice")
  system ! Start("Bob ")
}

保護者のアクターは、アクターシステムに送信されるすべてのメッセージを処理します。 保護者のアクターが停止すると、アクターシステム全体も停止します。

アクターは、次の動作としてBehavior.stoppedを返すのをやめることができます。 さらに、親アクターは、メソッド context.stop を呼び出し、適切な子アクター参照を渡す子アクターを停止できます。

アクターシステムは、アクターの参照を解決し、tellまたはaskパターンの両方を使用して送信されたすべてのメッセージをレシーバーアクターにルーティングする役割も果たします。

9. 結論

この記事では、アクターモデルの主な概念を紹介しました。 次に、AkkaTypedの実装に詳細に焦点を当てました。 アクターとは何か、何ができるか、そしてAkkaTypedでそれを実装する方法を見ました。

明らかに、監督、俳優のライフサイクルのよりきめ細かい処理など、他の多くの機能があります。 詳細については、公式ドキュメントをご覧ください。

いつものように、このチュートリアルのコードはGitHubから入手できます。