1. 序章

どのエンタープライズシステムでも、複数のシステムと統合する能力は重要な要件です。 大規模な組織には通常、相互に通信する必要のある特殊なシステムがあり、マイクロサービスアーキテクチャでは、統合がさらに重要になります。

このチュートリアルでは、JavaとScalaの一般的な統合フレームワークである Alpakka、について説明します。

2. アルパカについて

Alpakkaは、比較的新しい Enterprise IntegrationFrameworkです。 Reactive Streams の原則に従って実装され、AkkaおよびAkka-Streamsの上に構築されます。 他のほとんどの統合フレームワークとは異なり、 Alpakkaは、ストリーミングとリアクティブプログラミングをサポートするようにネイティブに設計されています。 創業以来、アルパカは統合システムの構築方法にプラスの影響を与えることができました。

3. キャメルとの比較

Apache Camel は、統合パイプラインの事実上のフレームワークと見なされています。 Camelはすでに300以上の異なるシステムをサポートしており、多くの大企業アプリケーションがそれを使用しています。

Alpakkaは、ApacheCamelの代替手段を提供します。 比較的若いですが、アルパカは多くの前向きな注目を集めています。

キャメルに対するアルパカの利点のいくつかは次のとおりです。

  • より良い型安全性
  • ストリーミングとバックプレッシャーのネイティブサポート
  • 非同期プログラミングのためのより良いDSL

4. シナリオ例

このチュートリアルでは、車両からIoTデバイスによって生成されたデータを処理するための単純な統合パイプラインを構築します。

簡単にするために、さまざまなIoTデバイスから生成されたデータがフラットファイルに継続的に書き込まれると仮定します。 ただし、MQTTプロトコルまたは複雑なKafkaシステムをソースとして使用するIoTデバイスの場合もあります。

Alpakkaコネクタを作成して、ファイルからデータを読み取り、MongoDBデータベースにプッシュします。

次の図は、統合コネクタの概要を示しています。

5. Alpakkaコネクタの構築

それでは、AlpakkaMongoDBコネクターの構築を始めましょう。

5.1. 依存関係

まず、必要な依存関係をbuild.sbtに追加します。

libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "2.0.1", 
  "com.typesafe.akka" %% "akka-stream" % "2.6.9",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.9.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % "2.0.2"
)

5.2. コネクタの書き込み

それでは、コネクタロジックを書いてみましょう。 まず、MongoDBのデータベース接続を作成する必要があります。

final val client = MongoClients.create("mongodb://localhost:27019")
final val db = client.getDatabase("vehicle-tracker")

次に、関連するケースクラスとコーデックレジストリを作成します。 MongoDBドライバーはこれを使用して、ケースクラスをBSONオブジェクトに、またはその逆に変換します。

final case class GPSLocation(lat: Double, lng: Double)
final case class VehicleData(vehicleId: Long, location: GPSLocation)

val vehicleCodec = fromRegistries(fromProviders(classOf[VehicleData], classOf[GPSLocation]), DEFAULT_CODEC_REGISTRY)

次に、 db インスタンスを使用して、MongoCollectionのインスタンスを作成しましょう。

val vehicleDataCollection: MongoCollection[VehicleData] = 
  db.getCollection(classOf[VehicleData].getSimpleName, classOf[VehicleData])
    .withCodecRegistry(CodecRegistry.vehicleCodec)

データのソース(Alpakka Source)としてフラットファイルを使用し、AlpakkaSinkとしてMongoDBを使用しています。 FileTailSource は、ファイルから継続的に読み取るためのAlpakkaコネクタです。

これで、フラットファイルから車両データを読み取り、MongoDBデータベースにプッシュできます。

val fs = FileSystems.getDefault
def init() = {
  FileTailSource.lines(
    path = fs.getPath(filePath),
    maxLineSize = 8192,
    pollingInterval = 100.millis
  ).map(s => {
    val v = s.split(",")
    VehicleData(v(0).toLong, GPSLocation(v(1).toDouble, v(2).toDouble))
  }).runWith{
    MongoSink.insertOne(vehicleCollection)
  }
}

上記のコードでは、FileTailSourceはフラットファイルからデータを継続的に読み取ります。 Alpakkaコネクタは、pollingIntervalとして指定した定期的な時間間隔でファイルへの新しい変更を探します。

maxLineSize は、ByteStringとして許可される行の最大サイズです。 行のサイズがmaxLineSizeを超えると、ストリームは失敗します。

Akka Streamsフローは、このフラットファイルデータをVehicleDataケースクラスのインスタンスに変換します。 次に、MongoSinkはこのVehicleDataを取得し、MongoDBに挿入します。

わずか10行のコードで、Alpakkaを使用してストリームベースの統合パイプラインを構築できます。

6. その他のアルパカのソースとシンク

前の例では、FileTailSourceを使用しました。 さらに、FlowおよびSinkに変更を加えることなく、他のソースを使用してデータを読み取ることができます。

リストから単純なAkkaストリームソースを作成するとします。

val simpleSource = Source(List(
  "1, 70.23857, 16.239987",
  "1, 70.876, 16.188",
  "2, 17.87, 77.71443",
  "3, 55.7712, 16.9088"
))

ここで、 FileTailSourceを上記のsimpleSource、に置き換え、他のすべてのコードを同じに保ちます。

simpleSource.map(s => {...}).runWith{...}

同様に、要件に応じて、他のソースまたはシンクを使用できます。

7. ストリーミングと逆圧

前のセクションで述べたように、Alpakkaはストリーミング操作をサポートするように設計されています。 AlpakkaはAkkaStreamsの上に構築されているため、AkkaStreamsのすべての概念がサポートされています。 最も重要な機能の1つは背圧です。

前の例では、 MongoSink は、新しいメッセージを受け入れることができる要求要求をアップストリームFlowに送信します。 次に、フローはデマンドをそのアップストリームFileTailSourceに送信します。 このFileTailSourceは、ファイルからコンテンツを読み取り、フローにプッシュしてから、MongoSinkにプッシュします。

MongoDBの操作に時間がかかる場合、シンクはデマンドを返送しないため、ファイルから行が読み取られる速度が低下します。 このようにして、大きなオーバーヘッドを発生させることなく、大きなファイルを処理できます。

8. 結論

この記事では、Alpakkaを見て、それを使用して堅牢な統合パイプラインを構築する方法を確認しました。

いつものように、この記事で使用されているコードサンプルは、GitHubからで入手できます。