開発者ドキュメント

spring-data-mongodb-tailable-cursors

Spring Data MongoDB Tailableカーソル

1. 前書き

このチュートリアルでは、https://www.baeldung.com/spring-data-mongodb-tutorial [Spring Data MongoDB]でテーラブルカーソルを利用して、MongoDBを無限データストリームとして使用する方法について説明します。

2. テール可能なカーソル

クエリを実行すると、データベースドライバはカーソルを開き、一致するドキュメントを提供します。 デフォルトでは、クライアントがすべての結果を読み込むと、MongoDBは自動的にカーソルを閉じます。 したがって、回転すると有限のデータストリームが生成されます。
ただし、クライアントが最初に返されたすべてのデータを消費し、無限のデータストリームを作成した後でも、開いたままであるテール可能なカーソルでキャップ付きコレクションを使用できます。または株式の更新。
Spring Data MongoDBプロジェクトは、テーラブルカーソルを含むリアクティブデータベース機能の利用を支援します。

3. セットアップ

前述の機能を実証するために、単純なログカウンターアプリケーションを実装します。 すべてのログを収集して中央の場所に保存するログアグリゲーター(MongoDBの上限付きコレクション)があると仮定します。
まず、単純な_Log_エンティティを使用します。
@Document
public class Log {
    private @Id String id;
    private String service;
    private LogLevel level;
    private String message;
}
次に、ログをMongoDBの上限付きコレクションに保存します。 https://docs.mongodb.com/manual/core/capped-collections/[Capped collections]は、挿入順序に基づいてドキュメントを挿入および取得する固定サイズのコレクションです。 _MongoOperations.createCollection_を使用して作成できます。
db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
  .capped(true)
  .sizeInBytes(1024)
  .maxDocuments(5));
キャップ付きコレクションの場合、_sizeInBytes_プロパティを定義する必要があります。 さらに、_maxDocuments_は、コレクションが持つことができるドキュメントの最大数を指定します。 到達すると、古いドキュメントはコレクションから削除されます。
第三に、適切なhttps://search.maven.org/search?q=a:spring-boot-starter-data-mongodb-reactive[Spring Boot starterdependency]を使用します。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    <versionId>2.1.6.RELEASE</versionId>
</dependency>

4. リアクティブテール可能カーソル

link:#messagelistener [imperative]とリアクティブMongoDB APIの両方で、調整可能なカーソルを使用できます。 *リアクティブバリアントを使用することを強くお勧めします*。
リアクティブアプローチを使用して_WARN_レベルログカウンターを実装しましょう。 * _ReactiveMongoOperations.tail_ methodを使用して無限ストリームクエリを作成できます。*
新しいドキュメントが制限付きコレクションに到着し、https://www.baeldung.com/queries-in-spring-data-mongodb [filterクエリ]:
private Disposable subscription;

public WarnLogsCounter(ReactiveMongoOperations template) {
    Flux<Log> stream = template.tail(
      query(where("level").is(LogLevel.WARN)),
      Log.class);
    subscription = stream.subscribe(logEntity ->
      counter.incrementAndGet()
    );
}
_WARN_ログレベルを持つ新しいドキュメントがコレクションに保持されると、サブスクライバー(ラムダ式)がカウンターをインクリメントします。
最後に、ストリームを閉じるためにサブスクリプションを破棄する必要があります。
public void close() {
    this.subscription.dispose();
}
また、*テーラブルカーソルは、クエリが最初に一致しない場合に無効になるか、無効になる可能性があることに注意してください。*つまり、新しい永続ドキュメントがフィルタークエリに一致しても、サブスクライバーはそれらを受信できません。 これは、MongoDBテール可能カーソルの既知の制限です。 テール可能なカーソルを作成する前に、キャップされたコレクションに一致するドキュメントがあることを確認する必要があります。

5. リアクティブリポジトリを持つテール可能なカーソル

Spring Dataプロジェクトは、リアクティブバージョンを含むさまざまなデータストアのリポジトリ抽象化を提供します。
MongoDBも例外ではありません。 詳細については、「link:/spring-data-mongodb-reactive[MongoDBを使用したスプリングデータリアクティブリポジトリ]」の記事をご覧ください。
さらに、* MongoDBリアクティブリポジトリは、クエリメソッドに_ @ Tailable_で注釈を付けることにより、無限ストリームをサポートします。
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
    @Tailable
    Flux<Log> findByLevel(LogLevel level);
}
この調整可能なリポジトリメソッドを使用して、_INFO_ログをカウントしましょう。
private Disposable subscription;

public InfoLogsCounter(LogsRepository repository) {
    Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
    this.subscription = stream.subscribe(logEntity ->
      counter.incrementAndGet()
    );
}
同様に、_WarnLogsCounter_に関しては、ストリームを閉じるためにサブスクリプションを破棄する必要があります。
public void close() {
    this.subscription.dispose();
}

6. _MessageListener_を使用したテール可能カーソル

それでも、リアクティブAPIを使用できない場合は、Springのメッセージングコンセプトを活用できます。
まず、送信された_SubscriptionRequest_オブジェクトを処理する_MessageListenerContainer_を作成する必要があります。 同期MongoDBドライバーは、上限付きコレクション内の新しいドキュメントをリッスンする、長時間実行されるブロッキングタスクを作成します。
Spring Data MongoDBには、_TailableCursorRequest:_の_Task_インスタンス*を作成および実行できるデフォルト実装が付属しています。
private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();

public ErrorLogsCounter(MongoTemplate mongoTemplate,
  String collectionName) {
    this.collectionName = collectionName;
    this.container = new DefaultMessageListenerContainer(mongoTemplate);

    container.start();
    TailableCursorRequest<Log> request = getTailableCursorRequest();
    container.register(request, Log.class);
}

private TailableCursorRequest<Log> getTailableCursorRequest() {
    MessageListener<Document, Log> listener = message ->
      counter.incrementAndGet();

    return TailableCursorRequest.builder()
      .collection(collectionName)
      .filter(query(where("level").is(LogLevel.ERROR)))
      .publishTo(listener)
      .build();
}
_TailableCursorRequest_は、_ERROR_レベルのログのみをフィルタリングするクエリを作成します。 一致する各ドキュメントは、カウンターをインクリメントする_MessageListener_に発行されます。
最初のクエリがいくつかの結果を返すことを確認する必要があることに注意してください。 それ以外の場合、テーラブルカーソルはすぐに閉じられます。
さらに、コンテナが不要になったら停止することを忘れないでください。
public void close() {
    container.stop();
}

7. 結論

調整可能なカーソルを備えたMongoDBキャップコレクションは、データベースから継続的に情報を受信するのに役立ちます。 明示的に閉じるまで結果を出し続けるクエリを実行できます。 Spring Data MongoDBは、テーラブルカーソルを利用するブロッキングとリアクティブの両方の方法を提供します。
完全な例のソースコードは、https://github.com/eugenp/tutorials/tree/master/spring-5-data-reactive [over on GitHub]で入手できます。
モバイルバージョンを終了