1. 序章

Elasticsearch は、 ElasticStackの中心にある分散型のRESTful検索および分析エンジンです。 何年にもわたって、Elasticsearchクラスターと対話するために多くのライブラリーが開発されてきました。

このチュートリアルでは、elastic4sを使用してScalaアプリケーションでElasticsearchを操作する方法を説明します。

2. Elasticsearchとは何ですか?

Elasticsearchはデータを一元的に保存するため、アプリケーションログや指標など、あらゆる形状やサイズの情報を検索、インデックス作成、分析できます。 これは、テキストドキュメントの形式で保存することによって実現されます。 Elasticsearchクラスターが複数のノードで構成されている場合、ドキュメントはクラスター全体に分散され、任意のノードからアクセスできます。

ドキュメントのコレクションは、いわゆるインデックスを形成します。 実際のところ、インデックス作成は、Elasticsearchクラスターにドキュメントを追加するプロセスです。

Elasticsearchは、HTTP経由で使用できる強力なRESTAPIを提供します。 また、JavaやPythonなどの多くのプログラミング言語用のさまざまな組み込みクライアントが付属しています。 Scalaエコシステムでは、elastic4sライブラリを使用できます。

3. elastic4sを使用したScalaでのElasticsearch

elastic4sはElasticsearch用のScalaクライアントです。 公式Javaクライアントも使用できますが、結果のコードはより冗長であり、EitherFutureなどのScalaコアクラスを活用できません。 たとえば、elastic4sは、nullではなくOptionを返します。 このライブラリはScalaコレクションもサポートしているため、Javaコレクションを使用する必要はなく、Stringの代わりにScalaのDurationを使用します。 ]Longは時間値を表します。

プロジェクトでelastic4sを使用するには、sbtを使用して2つのライブラリをプロジェクトにインポートする必要があります。

"com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % "7.16.0"
"com.sksamuel.elastic4s" %% "elastic4s-core" % "7.16.0"

DSLを使用するには、importステートメントを追加する必要があります。

import com.sksamuel.elastic4s.ElasticDsl._

デフォルトでは、elastic4sは応答を返すときにScala Futuresを使用します。ただし、elastic4sは、 ZIO Cats-Effect ScalaZ

次のセクションでは、elastic4sを使用してScalaでElasticsearchを操作する方法を説明します。

3.1. elastic4sクライアントの作成

Elastic4sを使用するための最初のステップは、クライアントを作成することです。

val client = ElasticClient(
  JavaClient(ElasticProperties("http://localhost:9200"))
)

上記の例では、 JavaClient をラップして、新しいElasticClientを作成しました。 さらに、Elasticsearchクラスターのエンドポイントを使用して後者を構成しました。 1つだけではなく複数のノードがある場合、複数のエンドポイントを渡すことができます

ElasticProperties("http://host1:9200,host2:9200,host3:9200")

アプリケーションの最後にクライアントを明示的に閉じる必要があります

client.close()

3.2. インデックスの作成

Elasticsearchクラスターに接続した後、コマンドの送信を開始できます。 クラスタがドキュメントを受信できるようにするためのインデックスを作成しましょう。

client.execute {
  createIndex("activities").mapping(
    properties(
      TextField("username"),
      DateField("when"),
      IpField("ip"),
      TextField("action")
    )
  )
}.await

上記の例では、ユーザーアクティビティを追跡するためのインデックスをモデル化しています。 アクティビティごとに、それを実行したユーザーのユーザー名、実行したアクション、実行したタイミング、および実行したIPアドレス([ X176X] ip )。

client.execute()はDSLの一部であり、クライアントが接続されているクラスターで操作を実行するようにelastic4sに指示します。この場合、そのような操作は CreateIndexApi :: createIndex()です。 。 まず、インデックスの名前、この場合はactivityを定義します。 次に、インデックスのプロパティを定義します。 Elastic4sは、さまざまなプロパティタイプを提供します。 この例では、 TextField DateField 、および IpField のみを示していますが、さらに多く( IntFieldShortFieldなど)を使用できます。整数の場合は)。

ElasticClient :: execute()では、Elasticsearchへの呼び出しを1回だけ実行できます。この構成では、awaitを呼び出して待機するFutureを返します。操作が完了するまで。 これはテスト目的には問題ありませんが、アプリケーションでは、戻り値を非同期で操作したい場合があります。

3.3. インデックスの一覧表示

CatsApi :: catIndices()を呼び出すことにより、クラスターで定義されているすべてのインデックスを一覧表示できます。 それを使用して、 活動インデックスは正しく作成されました:

client.execute {
  catIndices()
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: Response[Seq[CatIndicesResponse]] => println(response.result.exists(_.index == "activities"))
}

今回は、 await によって返される値をパターンマッチングします。これは、失敗( RequestFailure )または成功のいずれかである可能性があります。 この例では、応答リストをフィルタリングして、 activity という名前のインデックスがあるかどうかを確認しますが、その情報(ヘルスステータスなど)を出力することもできます。

インデックスが作成されたことを確認する簡単な方法があります– IndexAdminApi :: indexExists()操作

client.execute {
  indexExists("activities")
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: RequestSuccess[IndexExistsResponse] => println(response.result.exists)
}

この場合、必要な応答のタイプは IndexExistsResponse であり、result.existsのフィールドを出力することでactivitiesインデックスが存在することを確認できます。 X188X]応答。

3.4. インデックスでのドキュメントの作成

これで、作成したactivityインデックスにいくつかのドキュメントを書き込むことができます。

client.execute {
  indexInto("activities").fields(
    "username" -> "thomas",
    "when" -> Instant.now,
    "ip" -> "192.168.197.123",
    "action" -> "GetArticles"
  ).refresh(RefreshPolicy.Immediate)
}.andThen {
  case Success(_) =>
    client.execute(
      indexInto("activities").fields(
        "username" -> "robert",
        "when" -> Instant.now,
        "ip" -> "192.168.197.103",
        "action" -> "DeleteArticle"
      ).refreshImmediately
    )
}.await

IndexApi :: indexInto()を使用すると、ドキュメントをインデックスに書き込むことができます。 上記の例では、2回呼び出して、2つのアクティビティを追加します。1つはユーザー用です。 トーマスと1つロバート 。 この場合、Futureの構成可能性を活用して2つの操作を連鎖させます。

IndexRequest :: fields()を使用すると、ドキュメントをペアのシーケンス(String、Any)として指定できます。他の可能性もあります。 たとえば、 IndexRequest :: doc()を使用して、ペアのシーケンスの代わりにJSONドキュメントを作成できます。

最後に、ドキュメントをインデックスに追加した後、常に.refresh(RefreshPolicy.Immediate)を指定します。 これにより、Elasticsearchはクラスターを更新して、新しいドキュメントをすぐに確認できるようになります。 すぐに更新するのはかなり一般的であるため、 .refresh(RefreshPolicy.Immediate)の代わりに.refreshImmediatelyを指定することもできます。

3.5. インデックス内のドキュメントの一覧表示

インデックスに実際にドキュメントを作成したことを確認するために、SearchApi :: search()操作を呼び出すことができます

client.execute {
  search("activities")
}.await match {
  case failure: RequestFailure => println(failure.error)
  case response: RequestSuccess[SearchResponse] =>
    val searchHits = response.result.hits.hits.toList
    searchHits.foreach { searchHit =>
      println(s"${searchHit.id} -> ${searchHit.sourceAsMap}")
    }
}

結果の構造は少し複雑です。 特に、実際の結果を取得するには、response.result.hits.hitsにアクセスする必要があります。 これにより、Array [SearchHit]が得られます。ここで、SearchHitには、IDやドキュメント内のフィールドなど、ドキュメントに関するさまざまな有用な情報が含まれています。

上記の例では、Mapとして表されるドキュメントのIDとフィールドを印刷しています。 したがって、上記の例を実行すると、次の行が出力されます。

27P-sn0BEtaLvWE5iFPY -> Map(username -> thomas, when -> 2021-12-13T08:52:45.383498Z, ip -> 192.168.197.123, action -> GetArticles)
3LP-sn0BEtaLvWE5iVMO -> Map(username -> robert, when -> 2021-12-13T08:52:45.449883Z, ip -> 192.168.197.103, action -> DeleteArticle)

3.6. ドキュメントとインデックスの削除

最後に、インデックスからドキュメントを削除する方法を見てみましょう。

client.execute {
  deleteByQuery(
    "activities",
    termQuery("username", "robert")
  ).refreshImmediately
}

この場合、フィールドusernamerobertに設定されているすべてのドキュメントを削除しました。 DeleteApi :: deleteByQuery()を使用すると、インデックス内のドキュメントをフィルタリングし、特定の条件に一致するドキュメントのみを削除できます。

または、削除するドキュメントのIDがわかっている場合は、 DeleteApi :: deleteById()を呼び出すことができます。 ここでも、インデックス内のドキュメントをすぐに更新するために、 DeleteApi :: deleteByQuery()refreshImmediately。で構成しました。

上記のようにインデックス内のすべてのドキュメントを印刷すると、thomasのドキュメントのみが取得されます。

27P-sn0BEtaLvWE5iFPY -> Map(username -> thomas, when -> 2021-12-13T08:52:45.383498Z, ip -> 192.168.197.123, action -> GetArticles)

DeleteIndexApi :: deleteIndex()を使用すると、すべてのドキュメントを含むインデックス全体を削除できます

client.execute {
  deleteIndex("activities")
}

インデックス名として_allを指定することで、クラスター内のすべてのインデックスを削除できます。

4. 結論

この記事では、elastic4sを使用してScalaでElasticsearchを操作する方法を説明しました。 インデックスの作成からドキュメントの書き込み、検索からドキュメントの削除まで、さまざまな操作を検討しました。 また、 Future を、実際のElasticsearchクラスターでこのような操作を呼び出す簡単な方法として使用する方法についても説明しました。

いつものように、コードはGitHubから入手できます。