1. 序章

このチュートリアルでは、PlayFrameworkとAkkaActorsをLagomFrameworkと統合する方法を理解します。 また、そのような統合が必要な理由と、それを実現するための可能な方法についても理解します。

また、統合を示すために、サンプルのLagomベースのアプリケーションを作成します。

2. Lagomを理解する

Lagom は、JavaとScala で柔軟で復元力があり、応答性の高いシステムを構築するための、非常に評判の高いフレームワークです。

これは、Lightbendによって維持されているオープンソースフレームワークです。 これは、ライブラリと開発環境を提供して、リアクティブマイクロサービスに基づくシステムをベストプラクティスで構築します。

Lagomは、PlayやAkkaなどのLightbendの他のリアクティブフレームワークを活用することで、開発から展開までの複数の側面をサポートします。

マイクロサービスは、それらの間の通信が緩く結合された、分離された自律的なものになるように設計されています。 Lagom は、HTTPまたはWebSocketを介した同期または非同期通信を容易にします。 さらに、Kafkaのようなブローカーを介したメッセージベースの通信も提供し、少なくとも1回の配信セマンティクスを提供します。

また、データを排他的に所有し、データを直接制御できるようにマイクロサービスを設計します。 これは、BoundedContextの原則に基づいています。 Lagomは、イベントソーシングやCQRS などのよく知られたデザインパターンを通じて、データの永続化を促進します。 Lagomは、非同期APIを介してデータベース内のイベントストリームを永続化します。 LagomのデフォルトのデータベースはCassandraです。

さらに、疎結合マイクロサービスを開発する上でのその他の重要な部分は、サービス検出とサービスゲートウェイです。 サービスが相互に通信する方法や外部クライアントに位置の透過性を提供するように要求します。

サービスロケーターはLagomの開発環境に組み込まれており、サービスが相互に検出して通信できるようにします。 外部クライアントがLagomサービスに接続できるようにするために、ServiceGatewayも組み込まれています。

3. Lagomでの実例

LagomをPlayofAkkaと統合するためのオプションを検討するには、最初に実用的な例が必要です。 このセクションでは、LagomFrameworkを活用してシンプルなマイクロサービスベースのアプリケーションを構築します。

さらに、この例を使用して、LagomをPlayまたはAkkaと統合する方法を理解します。 この例は、Lagomの標準ドキュメントで提供されているものに基づいていますが、必要な基本事項を十分にカバーしていることに注意してください。

3.1. 設定

通常、Lagomは、提供する機能の恩恵を受けることができる本格的なマイクロサービスアーキテクチャがある場合に役立ちます。 ただし、このチュートリアルの目的は、PlayAPIとAkkaAPIをLagomに統合する方法を示すことであるため、シンプルに保ちます。 永続性のない単一のマイクロサービスを定義し、後でAkkaとPlayの統合で拡張します。

LagomはJavaとScalaでAPIを提供します。 ただし、このチュートリアルの例ではScalaを使用します。 さらに、LagomにはJavaでMavenまたはsbtを使用するオプションがありますが、Scalaではsbtが唯一の選択肢です。

Lagomプロジェクトをブートストラップする最も簡単な方法は、Lagomが提供するスターターツールを使用することです。 または、プロジェクト構造を定義して、sbtにブートストラップを生成させることもできます。

organization in ThisBuild := "com.baeldung"
version in ThisBuild := "1.0-SNAPSHOT"
scalaVersion in ThisBuild := "2.13.0"

val macwire = "com.softwaremill.macwire" %% "macros" % "2.3.3" % "provided"
val scalaTest = "org.scalatest" %% "scalatest" % "3.1.1" % Test

lazy val `hello` = (project in file(".")).aggregate(`hello-api`, `hello-impl`)

lazy val `hello-api` = (project in file("hello-api"))
  .settings(
    libraryDependencies ++= Seq(
      lagomScaladslApi
    )
  )

lazy val `hello-impl` = (project in file("hello-impl"))
  .enablePlugins(LagomScala)
  .settings(
    libraryDependencies ++= Seq(
      lagomScaladslTestKit,
      macwire,
      scalaTest
    )
  )
  .settings(lagomForkedTestSettings)
  .dependsOn(`hello-api`)

これは、SBTビルドファイルで定義できる単純なプロジェクト構造です。

Lagomは、サービスインターフェイス用に個別のプロジェクトを定義し、マイクロサービスごとにその実装を提案していることに注意してください。 したがって、ご覧のとおり、プロジェクト「hello-api」と「hello-impl」で定義されたhello-worldマイクロサービスがあります。 さらに、プロジェクト「hello-impl」はプロジェクト「hello-api」に依存しています。

これ以外に、LagomにはCassandraやKafkaの依存関係のように機能する通常の依存関係があります。 これらはオプションであり、すべてのマイクロサービスで必要になるとは限らないことに注意してください。

3.2. メッセージの定義

最初に行う必要があるのは、サービスが消費および生成するメッセージを定義することです。 また、Lagomが要求メッセージと応答メッセージのシリアル化と逆シリアル化に使用できる暗黙的またはカスタムのメッセージシリアライザー確実に提供する必要があります。

メッセージをすばやく定義しましょう。

case class Job(jobId: String, task: String, payload: String)
object Job {
    implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
    implicit val format: Format[JobAccepted] = Json.format
}

では、これらのメッセージをもう少しよく理解しましょう。

  • 2つのケースクラスJobJobStatusおよびそれらのコンパニオンオブジェクトを定義しました
  • 暗黙的なJSONシリアル化を追加しました。 デフォルトでは、Lagomはこの目的でPlayJSONを使用します
  • これらのメッセージは、サービスの要求と対応する応答を表します

3.3. サービスの定義

これまで見てきたように、Lagomはマイクロサービスをサービスインターフェースとその実装に分割することを好みます。 したがって、次のステップは、サービスインターフェイスを定義することです。

trait HelloService extends Service {
    def submit(): ServiceCall[Job, JobAccepted]
    override final def descriptor: Descriptor = {
        import Service._
        named("hello")
            .withCalls(
                pathCall("/api/submit", submit _)
            ).withAutoAcl(true)
    }
}

これは、Lagomでサービス記述子として知られている単純なScalaの特性です。 サービス記述子は、サービスを実装および呼び出す方法を定義します。

ここでいくつかの重要なことを理解しましょう:

  • submit関数にマップする単一の呼び出し「/api /submit」を定義しました
  • この関数は、パラメーターJobおよびJobAcceptedを受け取るServiceCallにハンドルを返します。
  • これらのパラメータは、厳密またはストリーミング可能なメッセージタイプです。
  • このハンドルを使用して、作業を実行する呼び出しを呼び出すことができます
  • パスベースの識別子を使用して、パスとクエリ文字列を使用して呼び出しをルーティングしています
  • その他の可能な識別子には、名前ベースの識別子とREST識別子が含まれます

3.3. サービスの実装

次に、定義したサービスインターフェイスの実装を提供する必要があります。 これには、記述子で指定された各呼び出しの実装が含まれている必要があります。

class HelloServiceImpl()(implicit ec: ExecutionContext)
  extends HelloService {
    override def submit(): ServiceCall[Job, JobAccepted] = ServiceCall {
      job =>
        Future[String] {JobAccepted(job.jobId)} 
    } 
}

これは、サービス記述子を定義した関数の基本的な実装です。 ただし、注目に値することがいくつかあります。

  • メソッドは呼び出しを実行しませんが、実行される呼び出しをラムダとして返します。
  • これは、関数ベースの構成でこの呼び出しを構成するための便利な方法を提供します
  • 呼び出し自体は値を返しませんが、promiseであるFutureを返します。
  • これにより、非同期で非ブロッキングのリアクティブアプリケーションを作成するための強力な方法が得られます

3.4. Lagomアプリケーションの作成

ここで、サービスとその実装をLagomアプリケーションにまとめる必要があります。 Lagomは、コンパイル時の依存性注入を使用して、このLagomアプリケーションを相互に接続します。 Lagomは、コンポーネントの依存関係を見つける軽量マクロを提供するMacwireを好みます。

LagomApplicationをすばやく簡単に作成する方法を見てみましょう。

abstract class HelloApplication(context: LagomApplicationContext)
    extends LagomApplication(context)
    with AhcWSComponents {
  override lazy val lagomServer: LagomServer = 
    serverFor[HelloService](wire[HelloServiceImpl])
}

ここでは、これらの単純な線の背後でいくつかの興味深いことが起こっています。

  • HelloApplicationはMacwireを介してAhcWSComponentsと混合されます
  • さらに、Scalaやサードパーティのその他の多くのコンポーネントを混在させることができます
  • Lagomがサービスバインディングを検出するために使用するメソッドlagomServerを実装します
  • Macwireのwireマクロを使用して、他の依存関係をHelloServiceImplに挿入できます。
  • このクラスは、メソッド serviceLocator の実装を必要とするため、依然として抽象的です。

最後に、アプリケーションがそれ自体をブートストラップできるようにアプリケーションローダーを作成する必要があります LagomApplicationLoader を拡張することにより、Lagomでこれを便利に行うことができます。

class HelloLoader extends LagomApplicationLoader {
    override def load(context: LagomApplicationContext): LagomApplication =
        new HelloApplication(context) {
            override def serviceLocator: ServiceLocator = NoServiceLocator
        }

    override def loadDevMode(context: LagomApplicationContext): LagomApplication =
        new HelloApplication(context) with LagomDevModeComponents

    override def describeService = Some(readDescriptor[HelloService])
}

コードのこの部分で注意すべき重要なことを見ていきましょう。

  • 必要な2つのメソッド、loadloadDevModeを実装しています。
  • ここで、適切なserviceLocatorをHelloApplicationミックスします。
  • メソッドdescribeServiceはオプションですが、サービスゲートウェイなどのコンポーネントの構成に役立ちます。

3.6. 構成

構成ファイルapplication.confで提供される値を使用して、Lagomサービスのさまざまな部分を構成することができます。

ただし、簡単な例では、構成する必要があるのはアプリケーションローダーだけです。

play.application.loader = com.baeldung.hello.impl.HelloLoader

3.5. 例の実行

これで、Lagomで簡単な実例を作成するために必要なすべての作業が完了しました。 これで、ようやくアプリケーションを実行する準備が整いました。

コマンドプロンプトとsbtツールを使用すると、Lagomアプリケーション全体を実行するための単一のコマンドです

sbt runAll

サーバーが正常にブートストラップされると、cURLなどのツールを使用してこのサービスにジョブを投稿できるようになります。

curl --location --request POST 'http://localhost:9000/api/submit' \
--header 'Content-Type: application/json' \
--data-raw '{
    "jobId":"jobId",
    "task":"task",
    "payload":"payload"
}'

4. LagomとPlayの統合

LagomはPlayフレームワークの上に実装されています。 Lagomサービスを構築する際、例で気付いたように、この詳細を意識する必要はありません。

ただし、Playには多くの強力な機能があり、それらに直接アクセスする必要がある場合があります。 一部のPlayAPIをLagomから直接呼び出すことができます。

4.1. Play Frameworkとは何ですか?

Play は、Lightbendによって維持されている生産性の高いJavaおよびScalaWebアプリケーションフレームワークです。 これは、軽量でステートレスでWebに適したアーキテクチャに基づいています。 Playフレームワークは、簡潔で機能的なプログラミングパターンも提供します。 内部でAkkaとAkkaStreamを活用して、リアクティブモデルと自然なスケーラビリティを提供します。

HTTPサーバー、強力なルーティングメカニズムなど、Playが提供するコンポーネントを使用してWebアプリケーションとRESTサービスを構築できます。 Playはデータベースアクセスについて意見がなく、いくつかの永続化ツールと統合することができます。

4.2. LagomからPlayにアクセスする必要があるのはなぜですか?

Lagomサービスを最初から構築することは完全に可能ですが、既存のアプリケーションにLagomを追加する必要がある状況に遭遇することがよくあります。 これらのアプリケーションは、さまざまなユースケースですでにPlayフレームワークを使用している可能性があります。 これにより、このようなアプリケーションの上に定義するLagomサービスを活用するための強力なツールが提供されます。

Lagomサービスから直接PlayAPIにアクセスする必要がある典型的なユースケースのいくつかを理解しましょう。

  • LagomとPlaygRPCルーターなどの既存のPlayルーターとの統合
  • AhcWSComponentsなどのPlayコンポーネントによって提供される機能にアクセスします

4.3. LagomからPlayAPIにアクセスする

シンプルなPlayルーターをLagomと統合するという目的をどのように達成できるか見てみましょう。 Playで単純なルーターを定義することから始めます。

class SimplePlayRouter(action: DefaultActionBuilder, parser: PlayBodyParsers) {
    val router = Router.from {
        case GET(p"/api/play") =>
            action(parser.default) { request =>
                Results.Ok("Response from Play Simple Router")
            }
    }
}

これは非常に単純なルーターであり、何の役にも立ちませんが、これをLagomと統合する方法を理解するのに役立ちます。 次のステップは、これをLagomアプリケーションローダーに接続し、Lagomサーバーに追加することです。

override lazy val lagomServer: LagomServer = 
  serverFor[HelloService](wire[HelloServiceImpl])
    .additionalRouter(wire[SimplePlayRouter].router)

ここでは、 Macwireのワイヤーマクロを使用してPlayルーターを挿入し、additionalRouterメソッドを使用してLagomサーバーに追加しています。

そもそもこれで十分ですが、少し問題があります。 この追加のルーターはサービス記述子の一部ではないため、ServiceGatewayはそれらをエンドポイントとして自動的に公開しません。

ただし、このルーターに固有のACL(アクセス制御リスト)をサービス記述子にすばやく追加できます。

named("hello")
    .withCalls(
        pathCall("/api/hello/:id", hello _),
    )
    .withAutoAcl(true)
    .withAcls(ServiceAcl(pathRegex = Some("/api/play")))

これで十分です。これで、サービスゲートウェイを介してPlayルーターの一部であるエンドポイントにアクセスできます。

curl http://localhost:9000/api/play

5. LagomとAkkaの統合

Playと同様に、Lagomサービスを構築する際にLagomがAkkaの上に構築されているという事実を必ずしも意識する必要はありません。 ただし、Akkaは、直接活用する必要のある非常に豊富な機能セットを提供します。 AkkaをLagomと統合し、Lagomサービスから直接Akka APIを呼び出すことも、その逆も可能です。

5.1. アッカとは?

Akka は、 Lightbendによって維持されているオープンソースライブラリのセットであり、スケーラブルで復元力のあるシステムの設計に使用できます。 Akkaは、アクターモデルを使用して、並行、並列、フォールトトレラント、および分散システムを開発するための抽象化レベルを提供します。 アクターモデルは、場所の透過性を提供しながら、メモリの可視性の問題を取り除くのに役立ちます。

Lagomは内部でAkkaライブラリを使用していくつかの機能を提供します。 たとえば、LagomPersistenceおよびPublish-SubscribeモジュールはAkkaの上に構築されています。 Lagomは、AkkaClusterを介してマイクロサービスにクラスタリングを提供します。 Lagomは、AkkaStreamsを使用したストリーミング非同期サービスも提供しています。

5.2. なぜAkkaとLagomを統合する必要があるのですか?

アクターモデルのおかげで、Akkaは、利用可能で、回復力があり、応答性の高いアプリケーションを作成するための独自の方法を提供します。  Lagomで作業しているときに、アプリケーションでこれらの属性を正確に実現する方法を制御するために、いくつかのAkkaAPIに直接アクセスする必要がある場合があります。 同様に、AkkaアクターからLagomサービスに信号を送る機会があるかもしれません。

AkkaとLagomを統合するように促す可能性のあるいくつかのユースケースを見てみましょう。

  • カスタム方式でワークロードをAkkaクラスター内のノードに分散します
  • AkkaアクターからLagomサービス実装へのメッセージの伝達

5.3. LagomからAkkaAPIにアクセスしますか?

AkkaをLagomと統合するのは非常に簡単です。 Akkaのほとんどすべてに、ActorSystemからアクセスできます。

Lagomの依存性注入を利用して、Lagomサービス実装または永続エンティティに現在のActorSystemを注入できます。

class HelloServiceImpl(system: ActorSystem)(implicit ec: ExecutionContext)

ここで、Lagomのデフォルトの依存性注入は、サービス実装で現在のActorSystemの注入を処理します。

リクエストメッセージのいくつかのパラメータに基づいて、着信リクエストをクラスタ内のノードにルーティングする必要があるとしましょう。 まず、着信リクエストを処理する Actor、を定義する必要があります。

class Worker() extends Actor {
    private val log = Logging.getLogger(context.system, this)
    override def receive = {
        case job @ Job(id, task, payload) =>
        log.info("Working on job: {}", job)
        sender ! JobAccepted(id)
        // perform the work...
    }
}

この単純なActorは、ジョブの詳細をログに記録し、送信者に確認するだけです。 次に、クラスターの一部またはすべてのノードでワーカーアクターを選択的に開始できます。

if (Cluster.get(system).selfRoles("worker-node")) {
    system.actorOf(Worker.props, "worker")
}

次に、クラスター内のワーカーノードにジョブを選択的にチャネルできるルーターを作成する必要があります。

val workerRouter = {
    val paths = List("/user/worker")
    val groupConf = ConsistentHashingGroup(paths, hashMapping = {
        case Job(_, task, _) => task
    })
    val routerProps = ClusterRouterGroup(
        groupConf,
        ClusterRouterGroupSettings(
            totalInstances = 1000,
            routeesPaths = paths,
            allowLocalRoutees = true,
            useRoles = Set("worker-node")
        )
    ).props
    system.actorOf(routerProps, "workerRouter")
}

ここでは、ConsistentHashingGroupに基づくClusterRouterGroupを使用しています。 これは、Jobtask属性を使用して、ジョブをグループ化し、ワーカーノードにルーティングします。 Akkaが提供するグループ化戦略には、 ClusterRouterGroup BroadcastGroup RandomGroup RoundRobinGroupなどのかなりの数があります。いくつか例を挙げると。

最後に、カスタムの方法でジョブを受け入れてクラスターノードにルーティングする準備が整いました。 これに対応するために、サービスの以前の実装を変更します。

override def submit(): ServiceCall[Job, JobAccepted] = ServiceCall {
    job =>
        implicit val timeout = Timeout(5.seconds)
        (workerRouter ? job).mapTo[JobAccepted]
}

ここでは、5秒のタイムアウトで、前に作成したルーターを使用して、着信ジョブをクラスターノードにルーティングしているだけです。

これで、以前と同じようにジョブを投稿できるようになります。これらは、定義どおりにルーティングされます。

5.4. AkkaからLagomAPIにアクセスする

ここで、AkkaからLagomAPIに簡単にアクセスする方法を見ていきます。

を拡張して、ワークロード実行の単純なActorの進行状況からシグナルを返します。 この通信を実現するために、よく知られているメッセージングパターンであるPublish-Subscribeを活用します。 Lagomは、 PubSubRegistryを介してこれをサポートします。これは、トピックのパブリッシュおよびサブスクライブに使用できるPubSubRefを提供します。

モジュール「hello-impl」のsbtビルドファイルに必要な依存関係を追加することから始めます。

libraryDependencies ++= Seq(
    lagomScaladslTestKit,
    lagomScaladslPubSub,
    macwire,
    scalaTest

前と同じように、Lagomの依存性注入を使用して、PubSubRegistryのインスタンスをサービス実装に注入します。

class HelloServiceImpl(system: ActorSystem, pubSub: PubSubRegistry)(implicit ec: ExecutionContext)

また、パブリッシュおよびサブスクライブするメッセージを定義する必要があります。

case class JobStatus(jobId: String, jobStatus: String)
object JobStatus {
    implicit val format: Format[JobStatus] = Json.format
}

ここで、前に定義した Actor を変更して、 PubSubRegistry を活用し、タイプJobStatusのメッセージをトピックに公開します。

class Worker(pubSub: PubSubRegistry) extends Actor {
    private val log = Logging.getLogger(context.system, this)
    override def receive = {
        case job @ Job(id, task, payload) =>
            log.info("Working on job: {}", job)
            sender ! JobAccepted(id)
            val topic = pubSub.refFor(TopicId[JobStatus]("job-status"))
            topic.publish(JobStatus(job.jobId,"started"))
            // perform the work...
            topic.publish(JobStatus(job.jobId,"completed"))
    }
}

ここでは、注入された PubSubRegistry を使用して、トピックと呼ばれるPubSubRefを取得しています。 次に、 PubSubRef#publishを使用して更新を送信しています。

これで、別のサービスコールを介してステータスの更新をストリーミングできます。 サービスインターフェイスに新しい呼び出しを追加しましょう。

def hello(id: String): ServiceCall[NotUsed, String]
def doWork(): ServiceCall[Job, JobAccepted]
def status(): ServiceCall[NotUsed, Source[JobStatus, NotUsed]]

override final def descriptor: Descriptor = {
    import Service._
    named("hello")
        .withCalls(
            pathCall("/api/hello/:id", hello _),
            pathCall("/api/dowork", doWork _),
            pathCall("/api/status", status _)
        )
        .withAutoAcl(true)
}

この新しいメソッドの戻りタイプはSourceであることに注意してください。これは、実際には非同期ストリーミングを有効にするAkkaStreamAPIです。 また、ここで紹介した新しいメソッドの実装を提供する必要があります。

override def status(): ServiceCall[NotUsed, Source[JobStatus, NotUsed]] = ServiceCall {
    _ =>
        val topic = pubSub.refFor(TopicId[JobStatus]("job-status"))
        Future.successful(topic.subscriber)
}

今やるべきことは、Lagomアプリケーションを更新し、PubSubComponentsという新しいコンポーネントを導入することだけです。

abstract class HelloApplication(context: LagomApplicationContext)
    extends LagomApplication(context)
    with PubSubComponents
    with AhcWSComponents {
    // Provide service bindings as before
}

これで、サービスコールにアクセスしてワークロードを投稿し、リアルタイムの更新を取得する準備が整いました。 リアルタイム更新のために次のエンドポイントにアクセスするには、Websocketクライアントが必要です。

ws://localhost:9000/api/status

6. バージョンに関する注記

Lagomは内部的にPlayとAkkaの上に構築されているため、はそれらのライブラリのバージョンに特定の依存関係があります。 Lagomアプリケーションでは、特定の理由で1つまたは複数のライブラリをアップグレードする必要がある場合があります。

Lagomは、これらのライブラリのバージョンをオーバーライドおよびアップグレードするための便利な方法を提供します。 これは、アプリケーションのsbtビルドファイルでdistributionOverridesを提供することで実現できます。

さらに、Lagomは、直接および一時的に多くの依存関係を追加します。 ただし、Akkaが提供するすべてのライブラリが含まれているわけではありません。 必要になる可能性があるが、まだ含まれていない他の依存関係を追加することもできます。

ただし、互換性のないバージョンを混在させないように注意する必要があります。 Akkaには厳密なバイナリ互換性規則があり、それらを破らないようにする必要があります。 また、依存関係を解決する際のsbtにより、トランジェントに直接宣言された最新のものが取得されることに注意する必要があります。 ツールsbt-dependency-graphは、プロジェクトの依存関係の分析に非常に役立ちます。

7. 結論

要約すると、このチュートリアルでは、Lagomフレームワークについて説明し、Lagomを使用して非常に単純なアプリケーションを作成しました。 また、LagomがPlayとAkkaに基づいてどのように構築されているかも理解しました。

さらに、LagomからPlayとAkkaの一部に直接アクセスできる理由と方法を調査しました。 これらの統合を理解するために、単純なアプリケーションを拡張しました。

いつものように、この記事のソースコードはGitHubにあります。