1. 序章

MongoDB は、今日最も人気のあるNoSQLデータベースの1つです。 BSON(Binary JSON)形式を使用して、データ(ドキュメント)をコレクションに保存します。 Scalaの場合、MongoDB用のドライバーがいくつかあります。 ただし、ReactiveMongoはそれらすべての中で最も人気があります。

このチュートリアルでは、ReactiveMongoと、それを使用してMongoDBでクエリを実行する方法について説明します。

2. ReactiveMongoについて

ReactiveMongoは、MongoDB用の非同期で非ブロッキングのScalaドライバーです。 標準のCRUD操作の実行に加えて、ストリームとしてのデータのクエリもサポートしているため、最小限のオーバーヘッドで大量のデータを処理できます。

3. 依存関係

まず、必要な依存関係をbuild.sbtファイルに追加しましょう。

"org.reactivemongo" %% "reactivemongo" % "1.0.3",
"de.flapdoodle.embed" % "de.flapdoodle.embed.mongo" % "3.0.0" % Test

ReactiveMongoの依存関係とは別に、組み込みのMongoDBの依存関係も追加します。 これにより、テスト用にメモリ内バージョンのMongoDBを開始できます。

4. 接続の作成

まず、MongoDBへの接続を作成しましょう。 データベースへの接続にはMongoDBURLを使用します。 たとえば、 movies という名前のローカルMongoDBデータベースに接続するには、URLを mongodb:// localhost:27017 /moviesとして指定できます。

val mongoDriver = AsyncDriver()
lazy val parsedURIFuture: Future[ParsedURI] = MongoConnection.fromString(mongoURL)
lazy val connection: Future[MongoConnection] = parsedURIFuture.flatMap(u => mongoDriver.connect(u))

データベースに接続するには、次を使用できます。

val db: Future[DB] = connection.flatMap(_.database(dbName))

このdbインスタンスを使用して、クエリを実行できる BSONCollection、へのインスタンスを取得できます。

val moviesCollection: Future[BSONCollection] = db.map(_.collection(collectionName))

5. サポートエンティティ

MongoDBコレクションをモデル化するためのサンプルcaseクラスを作成しましょう。

case class Movie(name:String, leadActor:String, genre:String, durationInMin: Int)

ケースクラスをBSONDocumentにマップするための暗黙的な変数を作成する必要があります。 これらは、ReactiveMongoが BSONDocumentをケースクラスインスタンスとの間で変換するためのスコープ内にある必要があります。

implicit def moviesWriter: BSONDocumentWriter[Movie] = Macros.writer[Movie]
implicit def moviesReader: BSONDocumentReader[Movie] = Macros.reader[Movie]

6. データの挿入

それでは、MongoDBコレクションにいくつかのドキュメントを挿入する方法を見てみましょう。

val movie = Movie("The Shawshank Redemption", "Morgan Freeman", "Drama", 144) 
  connection.getCollection("Movie").flatMap { col =>
  val insertResultFuture: Future[WriteResult] = col.insert.one(movie)
  insertResultFuture
  }
}

複数のドキュメントを一緒に挿入する場合は、 one()の代わりに many()メソッドを使用できます。

val allMovies: Seq[Movie] = getMoviesList()
connection.getCollection("Movie").flatMap { col =>
  col.insert.many(allMovies)
  }
}

7. コレクションからのドキュメントの取得

MongoDBの特定のフィールドに基づいてコレクションをフィルタリングする方法を見てみましょう。

val dramaMovies = connection.getCollection("Movie").flatMap(c =>
  c.find(BSONDocument("genre" -> "Drama"))
  .cursor[Movie]().collect(err = Cursor.FailOnError[List[Movie]]()))

特定の条件に一致する限られた数のドキュメントのみを取得したい場合は、収集するドキュメントの最大数を提供できます。 最長の2本のドラマ映画だけが欲しいとしましょう。

val longestTwoDramas = col.find(BSONDocument("genre" -> "Drama"))
  .sort(BSONDocument("durationInMin" -> -1))
  .cursor[Movie]().collect(2, Cursor.FailOnError[List[Movie]]())

sort()関数は、指定されたフィールドに基づいてドキュメントを並べ替えます。 値-1は降順でのソートに対応し、番号1は昇順に対応します。

フィルタを適用せずにコレクションからすべてのドキュメントを取得する場合は、findメソッドで空のBSONDocumentを使用できます。

val allMovies = connection.getCollection("Movie")
  .flatMap(_.find(BSONDocument()).cursor[Movie]()
  .collect(err = Cursor.FailOnError[List[Movie]]()))

クエリに複数の条件を指定することもできます。

val bradPittDramasFuture = connection.getCollection("Movie")
  .flatMap(_.find(
    BSONDocument(
      "genre" -> "Drama", 
      "leadActor" -> "Brad Pitt", 
      "durationInMin" -> BSONDocument("$gt" -> 130))
    ).cursor[Movie]().collect(err = Cursor.FailOnError[List[Movie]]()))

$ gt $ gte $ lt $ lte などの操作は、大なり小なりに使用されることに注意してください。 -フィルターよりも、フィルター条件でBSONDocumentでラップする必要があります。

8. ドキュメントの更新

それでは、ドキュメントを見つけて同じドキュメントのいくつかのフィールドを更新する方法を見てみましょう。 そのために、関数findAndUpdateを使用できます。

connection.getCollection("Movie").flatMap { col =>
  val updateStatus = col.findAndUpdate(
    BSONDocument("name" -> "Fight Club"),
    BSONDocument("$set" -> BSONDocument("durationInMin" -> 145))
  )
}

関数findAndUpdateは2つのパラメーターを取ります。 最初のパラメーターは、ドキュメントを選択するためのフィルターです。 2番目のパラメーターは、適用される更新です。 フィルター操作が複数のドキュメントに一致する場合、更新は最初の一致にのみ適用されます。 複数のレコードを更新するには、 UpdateBuilder を作成し、メソッド many()を呼び出す必要があります。

val updateFuture = connection.getCollection("Movie").flatMap { col =>
  val updateBuilder = col.update(true)
  val updates = updateBuilder.element(
    q = BSONDocument("genre" -> "Drama"),
    u = BSONDocument("$set" -> BSONDocument("genre" -> "Dramatic")),
    multi = true
  )
  updates.flatMap(updateEle => updateBuilder.many(Seq(updateEle)))
}

9. ドキュメントの削除

メソッドfindAndRemove()を呼び出して、コレクションからドキュメントを削除することもできます。 名前で映画を削除しましょう:

col.findAndRemove(BSONDocument("name" -> "Troy"))

10. Akka-Streamを使用したドキュメントのストリーミング

ReactiveMongoは、ドキュメントのストリーミングをサポートするようになりました。 ドキュメント全体をメモリにロードせずにデータを処理します。 ReactiveMongoには、akka-stream統合用のモジュールがあります。 これを使用するには、reactmongo-akkastream依存関係をbuid.sbtファイルに追加する必要があります。

"org.reactivemongo" %% "reactivemongo-akkastream" % "1.0.3"

これで、ストリーミングAPIを使用して、MongoDBからドキュメントをストリーミングおよび処理できます。 データベース内のすべての映画の長さの合計を見つける非常に簡単な例を実装してみましょう。 まず、ソースを作成する必要があります。

val source = col.find(BSONDocument()).cursor[Movie]()
  .documentSource(100, Cursor.FailOnError())

それでは、各映画の長さを抽出して合計しましょう。

val totalDurationFuture = source.map(_.durationInMin).runWith(Sink.fold(0)(_ + _))

11. 結論

このチュートリアルでは、ReactiveMongoを使用してMongoDBでクエリを実行する方法を説明しました。 ReactiveMongoの最も一般的で広く使用されている部分について説明しました。

いつものように、サンプルコードはGitHubから入手できます。