PlayFrameworkでの非同期タスクのスケジュール
1. 概要
このチュートリアルでは、 Play Framework を使用しながら、ルートハンドラーで非同期タスクをスケジュールする方法を説明します。
まず、非同期タスクをいつ使用できるかを指定します。 次に、PlayFrameworkでタスクをスケジュールする2つの方法を見ていきます。
2. なぜ非同期タスクが必要なのですか?
PlayFrameworkアプリケーションで非同期タスクを使用する理由は少なくとも3つあります。
- 複数のアプリケーションを含む長時間実行プロセスの開始
- アプリケーションの内部状態の更新をトリガーする
- 応答時間を増やすことなくユーザーアクションを追跡する
場合によっては、REST APIの呼び出しを受け取った後、長時間実行されるタスクを開始したいことがあります。 データベースからデータを取得し、PDFレポートを生成し、それをユーザーに送信するタスクがあるとします。 REST APIを使用するクライアントアプリケーションが、電子メールが送信されるまで待機することはあまり意味がありません。 Accepted HTTPステータスをすぐに返送して、バックグラウンドタスクを処理することをお勧めします。
また、アプリケーションキャッシュをクリアするか、データをリロードして、応答を提供するためにキャッシュを準備することもできます。 この場合、キャッシュが更新されるまで待ちたくありません。 結局のところ、HTTP接続は、アプリケーションがプロセスを終了する前にタイムアウトする可能性があります。
最後に、ユーザーアクションを追跡するためにイベントをメッセージキューに送信する必要がある場合があります。 追跡が監視目的で存在し、通知がビジネスプロセスの重要な部分ではない場合、バックグラウンドタスクでそれらを送信することを選択する場合があります。
3. 非同期タスクとしてのスケジューリング機能
Play Frameworkは、基盤となるテクノロジーとしてAkkaアクターシステムを使用します。 コントローラコードでは、アクターシステムとそのスケジューラーにアクセスできます。 スケジューラーにアクセスできる場合、バックグラウンドタスクとして任意の関数を実行できます。
まず、コントローラーの依存関係として ActorSystem を追加し、暗黙の依存関係としてExecutionContextを追加します。
class AsyncTaskController @Inject()(val controllerComponents: ControllerComponents, val actorSystem: ActorSystem)(implicit ec: ExecutionContext) extends BaseController {
def runAsync(): Action[AnyContent] = Action {
...
}
}
エンドポイントにアクセスするには、その構成をroutersファイルに追加します。
GET /async controllers.AsyncTaskController.runAsync()
これで、 ActorSystemのスケジューラーを取得し、それを使用して無名関数を30秒の遅延で実行するようにスケジュールできます:
import scala.concurrent.duration._
import scala.language.postfixOps
def runAsync(): Action[AnyContent] = Action {
Console.println(s"In route handler: ${DateTime.now()}")
actorSystem.scheduler.scheduleOnce(30 seconds) {
Console.println(s"30 seconds later: ${DateTime.now()}")
}
...
}
30秒は有効なFiniteDurationオブジェクトであることに注意してください。これは、scala.concurrent.duration。_パッケージから暗黙的な変換をインポートし、後置表記を有効にしているためです。 importscala.language.postfixOpsを使用します。
匿名関数がバックグラウンドで実行され、少なくとも30秒遅延することを確認するために、Play Frameworkアプリケーションを実行し、ブラウザーでhttp:// localhost:9000/asyncにアクセスしてエンドポイントを呼び出します。 ログには、次の2つのメッセージが表示されます。
In route handler: 2021-02-10T18:09:22.639+01:00
30 seconds later: 2021-02-10T18:09:52.794+01:00
4. アクターの使用
ActorSystem を使用することには、2つの主な利点があります。
- アクターはステートフルにすることができますが、同時に、アクターは一度に1つのメッセージを処理するため、同時アクセスについて心配する必要はありません。
- アクターは応答を送り返すことができます
Play Frameworkでアクターを使用するには、アクターをアクターシステムに追加するGuiceモジュールを定義する必要があります。 さらに、コントローラーへのアクター参照を挿入する必要があります。 アクター参照を挿入し、アクターにメッセージを送信することから始めましょう。
class AsyncTaskController @Inject()(..., @Named("async-job-actor") actor: ActorRef)(implicit ec: ExecutionContext) extends BaseController {
def runAsync(): Action[AnyContent] = Action {
actor ! "THIS IS THE MESSAGE SENT TO THE ACTOR"
...
}
}
@Named( “async-job-actor”)アノテーションは、どのアクターをコントローラーに渡すかをPlayFrameworkに指示します。 runAsync 関数内で、!メソッドを呼び出します。このメソッドは、指定されたオブジェクトをアクターに送信します。 シリアル化可能なオブジェクトをメッセージとして渡すことができます。
それを機能させるには、さらにいくつかのものが必要です。 まず、メッセージを受信するアクターを定義します:
class AsyncTaskInActor extends Actor {
override def receive: Receive = {
case msg: String =>
Console.println(s"Message ${msg} received at ${DateTime.now()}")
}
}
次に、Guice依存性注入モジュールにバインディングを追加します。
class ActorsModule extends AbstractModule with AkkaGuiceSupport {
override def configure(): Unit = {
bindActor(classOf[AsyncTaskInActor], "async-job-actor")
}
}
最後に、 application.conf ファイルを開き、 PlayFrameworkで使用されるモジュールに新しいモジュールを追加します。
play.modules.enabled += "actors.ActorsModule"
4.1. アクターに送信されるメッセージのスケジュール
Akkaアクターに送信されるメッセージを遅延させたい場合は、このチュートリアルに示されている両方のソリューションをマージし、パラメーターとしてアクター参照を受け入れるscheduleOnce関数を使用できます。
actorSystem.scheduler.scheduleOnce(
30 seconds,
actor,
"A TEST MESSAGE!!"
)
4.2. 定期的なタスクのスケジュール
場合によっては、定期的なタスクを実行する必要があります。 その場合、 scheduleAtFixedRate 関数を使用します。この関数は、タスクの初期遅延、後続の実行の間隔、アクターハンドラー、およびメッセージを受け入れます。
val cancellable = actorSystem.scheduler.scheduleAtFixedRate(
10 seconds,
5 minutes,
actor,
"recurring task message"
)
戻り値として、 Cancellable のインスタンスを取得します。これを使用して、定期的なタスクを停止できます。
cancellable.cancel()
5. 結論
このチュートリアルでは、 Scheduler を直接使用して、Play Frameworkで無名関数を非同期で実行する方法と、Akka ActorSystemにアクセスして非同期タスクを実行する方法を示しました。 カスタムアクターをActorSystemに追加し、アクター参照を挿入するGuiceモジュールを定義する方法を示しました。
完全な例はGitHubにあります。