Scalaで入力されたメールボックス
1. 序章
メールボックスは、アクターモデルの基本的な部分の1つです。 メールボックスメカニズムを介して、アクターはメッセージの受信をその詳細から切り離すことができます。
それでは、アクターシステムの最も有名な化身である Akka Typed が、メールボックスの概念をどのように実装するかを見てみましょう。
2. Akkaの依存関係
SBTを使用して、必要なプロジェクトの依存関係を設定します。 Akka Typedライブラリを使用するには、akka-actor-typedアーティファクトから依存関係をインポートする必要があります。
libraryDependencies += "com.typesafe.akka" % "akka-actor-typed_2.12" % "2.6.11"
Akka Typedアクターをテストするには、akka-actor-testkit-typedアーティファクトから依存関係をインポートする必要があります。
libraryDependencies += "com.typesafe.akka" % "akka-actor-testkit-typed_2.12" % "2.6.11" % Test
3. ユーザーのナビゲーションのログ
まず、アクターは、受信した通信に応答してアクションを実行するオブジェクトです。 したがって、Akka Typedでは、そのようなコミュニケーションは俳優のメッセージです行動定義する
この時点で、議論を進めるための具体的な例が必要です。 ウェブサイトを通じて靴を販売する新しいeコマースビジネスを始めたばかりだと想像してみてください。 ユーザーの行動をよりよく理解するために、eコマースサイト内でのユーザーのナビゲーションに関する情報を収集したいと思います。
たとえば、マウスの動き、クリック、およびユーザーがキーボードで数字を入力するテキストを収集してみましょう。
sealed trait UserEvent {
val usedId: String
}
case class MouseClick(override val usedId: String, x: Int, y: Int) extends Event
case class TextSniffing(override val usedId: String, text: String) extends Event
case class MouseMove(override val usedId: String, x: Int, y: Int) extends Event
したがって、リアクティブシステムの力を解き放つために、そのような情報を収集するアクターを定義します。
val eventCollector: Behavior[UserEvent] = Behaviors.receive { (ctx, msg) =>
msg match {
case MouseClick(user, x, y) =>
ctx.log.info(s"The user $user just clicked point ($x, $y)")
case MouseMove(user, x, y) =>
ctx.log.info(s"The user $user just move the mouse to point ($x, $y)")
case TextSniffing(user, text) =>
ctx.log.info(s"The user $user just entered the text '$text'")
}
Behaviors.same
}
4. 私たちのビジネスのスケーリング(または台無し)
前述したように、アクター eventCollector は、メールボックスから処理するメッセージを読み取ります。 メールボックスは、メッセージを保持するデータ構造にすぎません。アクターのメールボックスをメッセージで埋めるのは、ActorSystemです。
ある日、私たちのeコマースサイトの広告がVogue Americaに表示され、翌日、サイトをナビゲートするユーザーの数が10倍になります。 eventCollector アクターはどうなりますか? 予想外の負荷の増加に対して弾力性はありますか?
残念ながら、簡単に言えば、アクターは膨大な数のメッセージに圧倒され、システムはOutOfMemoryErrorで終了します。 実際、アクターのデフォルトのメールボックスはSingleConsumerOnlyUnboundedMailboxです。
名前が示すように、メールボックスには制限がありません。つまり、配信されたメッセージは拒否されません。 その上、アクターシステムに実装された背圧メカニズムはありません。 したがって、着信メッセージの数がアクターの実行ペースよりもはるかに多い場合、システムはすぐにメモリを使い果たします。
さらに、 SingleConsumerOnlyUnboundedMailbox メールボックスを使用するアクターは、それを他のアクターと共有できず、複数のプロデューサーと1つのコンシューマーのみとの通信モデルを実装します。
貧弱なeventCollectorアクターが私たちのビジネスを台無しにすることをどのように回避できますか? アクターを構成する方法を一緒に見てみましょう。
5. 多くの種類のメールボックス
幸いなことに、AkkaTypedライブラリにはかなりの数のメールボックスタイプがあります。 デフォルトのを受け入れるのではなく、作成中にアクターのメールボックスタイプを設定できます。 したがって、 Props オブジェクトを使用して、適切なメールボックスを選択します。
ctx.spawn(eventCollector, id, MailboxSelector.bounded(1000))
ご覧のとおり、 MailboxSelector ファクトリでは、適切に構成されたPropsオブジェクトを作成できます。 上記の例では、固定数のメッセージのみを格納する制限付きメールボックスを提供するようにアクターを構成しました。 制限付きメールボックスについては、この記事の後半で詳しく説明します。
さらに、 MailboxSelector.fromConfig ファクトリメソッドを使用して、構成プロパティからメールボックスの種類を読み取ることができます。
val props = MailboxSelector.fromConfig("mailboxes.event-collector-mailbox")
ctx.spawn(eventCollector, s"{$id}_1", props)
したがって、構成ファイルは次のようになります。
mailboxes {
event-collector-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
}
一般に、構成をハードコーディングしない方がよい。 ですから、このアプローチが望ましいのです。なぜなら、私たちにはどんな人生が待ち受けているのかわからないからです。
さて、メールボックスには多くの種類があることを学びました。 大まかに言って、2つの異なる機能を使用してそれらを分類できます。
- 有界対。 無制限
- ブロッキングvs。 ノンブロッキング
利用可能なメールボックスタイプの完全なリストは、AkkaTyped公式ドキュメントにあります。
5.1. 有界対。 無制限
すでに述べたように、無制限のメールボックスは無期限に成長し、メッセージのプロデューサーがコンシューマーよりもはるかに速い場合、使用可能なすべてのメモリを消費します。 したがって、この種のメールボックスは、些細なユースケースでのみ使用します。
一方、制限付きメールボックスは、固定数のメッセージのみを保持します。 アクターシステムは、メールボックスがいっぱいになると、アクターに到着するすべてのメッセージを破棄します。 このようにして、メモリの不足を回避できます。
少し前に行ったように、Mailbox.boundedファクトリメソッドを使用してメールボックスのサイズを直接構成できます。 または、構成プロパティファイルから指定することもできます。
mailboxes {
event-collector-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 100
}
}
上記の例は、制限されたメールボックスが光る明確な例です。 相手がシステムを稼働状態に維持している場合、一部のメッセージが失われることを恐れません。
新しい質問が発生するはずです:破棄されたメッセージはどこに行きますか? 彼らはただ捨てられているのですか? 幸いなことに、アクターシステムでは、デッドレターのメカニズムを通じて、破棄されたメッセージに関する情報を取得できます。これがどのように機能するかについては、すぐに詳しく説明します。
5.2. ブロッキングvs。 ノンブロッキング
アクターモデルにとっては奇妙に聞こえるかもしれませんが、Akkaは、メッセージのプロデューサーをブロックするタイプのメールボックスも提供します。 実際、ブロックしているメールボックスを使用すると、送信者は、メッセージがメールボックスに正常に配信されたことをアクターシステムが通知するまでブロックします。
明らかに、プロデューサーを永遠に待たせるのは良い考えではありません。 したがって、Akkaは、mailbox-push-timeout-timeと呼ばれるタイムアウト設定を備えた制限付きメールボックスを提供します。
mailboxes {
event-collector-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 100
mailbox-push-timeout-time = 1s
}
}
送信者は、メッセージがメールボックスに挿入されるのを指定された時間待機します。 その後、アクターシステムはデッドレターにメッセージを送信し、プロデューサーは自由に処理を進めることができます。 ただし、 mailbox-push-timeout-time をゼロに設定すると、非ブロッキングメールボックスが再度取得されます。
したがって、ブロッキング、制限付きメールボックスの使用は、バックプレッシャーを実装するAkkaの方法です。
6. デッドレター
メッセージがアクターメールボックスに書き込まれない場合は常に、アクターシステムはメッセージを /deadLettersと呼ばれる合成アクターにリダイレクトします。 デッドレターメッセージの配信保証は、システム内の他のメッセージと同じです。 したがって、そのようなメッセージをあまり信頼しない方がよいでしょう。 デッドレターの主な目的はデバッグです。
アクターシステムオブジェクトの専用メソッドを使用して、デッドレターをリッスンしているデフォルトのアクターへの参照を取得できます。
val defaultDeadLettersActor: ActorRef[DeadLetter] = system.deadLetters[DeadLetter]
さらに、アクターはakka.actor.DeadLetterメッセージを受信するようにサブスクライブできます。 アクターシステムは、イベントストリームと呼ばれる特別な通信チャネルを使用してデッドレターを配信します。
したがって、アクターをイベントストリームにサブスクライブするには、アクターはその動作でDeadLetterメッセージをリッスンする必要があります。
val deadLettersListener: Behavior[DeadLetter] = Behaviors.receive { (ctx, msg) =>
msg match {
case DeadLetter(message, sender, recipient) =>
ctx.log.debug(s"Dead letter received: ($message, $sender, $recipient)")
Behaviors.same
}
}
次に、アクターシステムは、デッドレターをそのアクターにリダイレクトするようにインストルメント化する必要があります。
val deadLettersActor: ActorRef[DeadLetter] =
system.systemActorOf(deadLettersListener, "deadLettersListener")
system.eventStream.tell(EventStream.Subscribe[DeadLetter](deadLettersActor))
ただし、デッドレターはネットワークを介して伝播されないため、サブスクライブされたアクターはローカルシステムで公開されたデッドレターのみを受信することを覚えておく必要があります。
7. 結論
要約すると、この記事では、アクターのメールボックスの概念を紹介しました。 例を通して、制限のないメールボックスの長所と短所を活用し、代わりに制限のあるメールボックスをいつ使用するかを検討しました。 メールボックスのブロックと非ブロック、およびこれらの概念がバックプレッシャにどのように関連するかについて話しました。
さらに、カスタムメールボックスの実装、ディスパッチャー、アクター間でメールボックスを共有する方法など、メールボックスには他にも多くの処理が必要です。
いつものように、コードはGitHubでから入手できます。