1. 序章

このチュートリアルでは、 Spring Data MongoDB で調整可能なカーソルを利用して、MongoDBを無限のデータストリームとして使用する方法について説明します。

2. テーラブルカーソル

クエリを実行すると、データベースドライバーはカーソルを開いて、一致するドキュメントを提供します。 デフォルトでは、クライアントがすべての結果を読み取ると、MongoDBはカーソルを自動的に閉じます。 したがって、回転すると有限のデータストリームになります。

ただし、クライアントが最初に返されたすべてのデータを消費した後でも、開いたままの調整可能なカーソルで上限付きコレクションを使用できます。これにより、無限のデータストリームが作成されます。このアプローチは、チャットなどのイベントストリームを処理するアプリケーションに役立ちます。メッセージ、または在庫の更新。

Spring Data MongoDBプロジェクトは、調整可能なカーソルを含むリアクティブデータベース機能の利用に役立ちます。

3. 設定

上記の機能を示すために、単純なログカウンターアプリケーションを実装します。 すべてのログを収集して中央の場所(MongoDBの上限付きコレクション)に保持するログアグリゲーターがあると仮定します。

まず、単純なLogエンティティを使用します。

@Document
public class Log {
    private @Id String id;
    private String service;
    private LogLevel level;
    private String message;
}

次に、ログをMongoDBの上限付きコレクションに保存します。 上限付きコレクションは、挿入順序に基づいてドキュメントを挿入および取得する固定サイズのコレクションです。 MongoOperations.createCollectionを使用してそれらを作成できます。

db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
  .capped(true)
  .sizeInBytes(1024)
  .maxDocuments(5));

上限付きコレクションの場合、sizeInBytesプロパティを定義する必要があります。 さらに、 maxDocuments は、コレクションが持つことができるドキュメントの最大数を指定します。 到達すると、古いドキュメントはコレクションから削除されます。

第三に、適切なSpring Bootスターター依存関係を使用します。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    <versionId>2.2.2.RELEASE</versionId>
</dependency>

4. リアクティブテーラブルカーソル

命令型とリアクティブMongoDBAPIの両方で調整可能なカーソルを使用できます。 リアクティブバリアントを使用することを強くお勧めします。

リアクティブアプローチを使用して、WARNレベルのログカウンターを実装しましょう。 ReactiveMongoOperations.tailメソッドを使用して無限のストリームクエリを作成できます。

新しいドキュメントが上限付きコレクションに到着し、フィルタークエリと一致すると、調整可能なカーソルが開いたままになり、データ(エンティティのフラックス)が出力されます。

private Disposable subscription;

public WarnLogsCounter(ReactiveMongoOperations template) {
    Flux<Log> stream = template.tail(
      query(where("level").is(LogLevel.WARN)), 
      Log.class);
    subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

WARN ログレベルの新しいドキュメントがコレクションに保持されると、サブスクライバー(ラムダ式)はカウンターをインクリメントします。

最後に、サブスクリプションを破棄してストリームを閉じる必要があります。

public void close() {
    this.subscription.dispose();
}

また、クエリが最初に一致を返さない場合、調整可能なカーソルが無効になるか、無効になる可能性があることに注意してください。つまり、新しい永続ドキュメントがフィルタークエリに一致しても、サブスクライバーはそれらを受信できません。 。 これは、MongoDBの調整可能なカーソルの既知の制限です。 調整可能なカーソルを作成する前に、キャップされたコレクションに一致するドキュメントがあることを確認する必要があります。

5. リアクティブリポジトリを備えたテーラブルカーソル

Spring Dataプロジェクトは、リアクティブバージョンを含むさまざまなデータストアのリポジトリ抽象化を提供します。

MongoDBも例外ではありません。 詳細については、 Spring Data Reactive Repositories withMongoDBの記事を確認してください。

さらに、 MongoDBリアクティブリポジトリは、クエリメソッドに@Tailableアノテーションを付けることで、無限ストリームをサポートします。 Fluxまたは複数の要素を発行できるその他のリアクティブタイプを返すリポジトリメソッドにアノテーションを付けることができます。

public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
    @Tailable
    Flux<Log> findByLevel(LogLevel level);
}

この調整可能なリポジトリメソッドを使用して、INFOログをカウントしてみましょう。

private Disposable subscription;

public InfoLogsCounter(LogsRepository repository) {
    Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
    this.subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

同様に、 WarnLogsCounter の場合、ストリームを閉じるためにサブスクリプションを破棄する必要があります。

public void close() {
    this.subscription.dispose();
}

6. MessageListenerを使用したテール可能カーソル

それでも、リアクティブAPIを使用できない場合は、Springのメッセージングの概念を活用できます。

まず、送信されたSubscriptionRequestオブジェクトを処理するMessageListenerContainerを作成する必要があります。 同期MongoDBドライバーは、上限のあるコレクション内の新しいドキュメントをリッスンする、実行時間の長いブロッキングタスクを作成します。

Spring Data MongoDBには、 TaylorableCursorRequest:のタスクインスタンスを作成および実行できるデフォルトの実装が付属しています。

private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();

public ErrorLogsCounter(MongoTemplate mongoTemplate,
  String collectionName) {
    this.collectionName = collectionName;
    this.container = new DefaultMessageListenerContainer(mongoTemplate);

    container.start();
    TailableCursorRequest<Log> request = getTailableCursorRequest();
    container.register(request, Log.class);
}

private TailableCursorRequest<Log> getTailableCursorRequest() {
    MessageListener<Document, Log> listener = message -> 
      counter.incrementAndGet();

    return TailableCursorRequest.builder()
      .collection(collectionName)
      .filter(query(where("level").is(LogLevel.ERROR)))
      .publishTo(listener)
      .build();
}

TailableCursorRequest は、ERRORレベルのログのみをフィルタリングするクエリを作成します。 一致する各ドキュメントはMessageListenerに公開され、カウンターがインクリメントされます。

最初のクエリがいくつかの結果を返すことを確認する必要があることに注意してください。 それ以外の場合、調整可能なカーソルはすぐに閉じられます。

さらに、コンテナが不要になったら、コンテナを停止することを忘れないでください。

public void close() {
    container.stop();
}

7. 結論

調整可能なカーソルを持つMongoDBの上限付きコレクションは、データベースから継続的に情報を受け取るのに役立ちます。 明示的に閉じるまで結果を出し続けるクエリを実行できます。 Spring Data MongoDBは、調整可能なカーソルを利用するブロッキングとリアクティブの両方の方法を提供します。

完全な例のソースコードは、GitHubから入手できます。