1. 序章

ksqlDB は、 ApacheKafkaおよびKafkaStreams上に構築されたリアルタイムのイベントストリーミングデータベースとして説明できます。 これは、強力なストリーム処理とSQL構文を使用したリレーショナルデータベースモデルを組み合わせたものです。

このチュートリアルでは、ksqlDBの基本的な概念を取り上げ、実際のユースケースを示すためのサンプルアプリケーションを作成します。

2. 概要

ksqlDBはイベントストリーミングデータベースであるため、ストリームとテーブルがそのコア抽象化です。 基本的に、これらはリアルタイムで変換および処理できるデータのコレクションです。

ストリーム処理により、これらの無制限のイベントストリームに対する継続的な計算が可能になります。 SQL を使用して、コレクションを変換、フィルタリング、集約、および結合して、新しいコレクションまたはマテリアライズドビューを派生させることができます。 さらに、新しいイベントはこれらのコレクションとビューを継続的に更新して、リアルタイムデータを提供します。

最後に、クエリはさまざまなストリーム処理操作の結果を公開します。 ksqlDBクエリは、従来のデータベースと同様に、非同期のリアルタイムアプリケーションフローと同期の要求/応答フローの両方をサポートします。

3. 設定

ksqlDBの動作を確認するために、イベント駆動型Javaアプリケーションを構築します。 これにより、さまざまなセンサーソースからの読み取り値の無制限のストリームが集約およびクエリされます。

主な使用例は、特定の期間内に、読み取り値の平均値が指定されたしきい値を超える状況を検出することです。 さらに、重要な要件は、アプリケーションが、たとえばダッシュボードや警告システムを構築するときに使用できるリアルタイムの情報を提供する必要があることです。

ksqlDB Javaクライアントを使用してサーバーと対話し、テーブルを作成し、クエリを集約し、さまざまなクエリを実行します。

3.1. Docker

ksqlDBはKafka上で実行されるため、Docker Composeを使用してKafkaコンポーネント、ksqlDBサーバー、およびksqlDBCLIクライアントを実行します。

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    ...

  broker:
    image: confluentinc/cp-kafka:6.2.0
    hostname: broker
    ...

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.19.0
    hostname: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    healthcheck:
      test: curl -f http://ksqldb-server:8088/ || exit 1
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.19.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

さらに、この docker-compose.yml ファイルをJavaアプリケーションで使用して、Testcontainersフレームワークを使用した統合テスト用の環境を起動します。

まず、次のコマンドを実行してスタックを起動しましょう。

docker-compose up

次に、すべてのサービスが開始されたら、インタラクティブCLIに接続しましょう。 これは、サーバーのテストと操作に役立ちます。

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

また、各トピックの最も早い時点からすべてのクエリを開始するようにksqlDBに指示します。

ksql> SET 'auto.offset.reset' = 'earliest';

3.2. 依存関係

このプロジェクトでは、主にJavaクライアントを使用してksqlDBと対話します。 具体的には、Confluent Platform(CP)にksqlDBを使用するため、CPMavenリポジトリをPOMファイルに追加する必要があります。

<repository>
    <id>confluent</id>
    <name>confluent-repo</name>
    <url>http://packages.confluent.io/maven/</url>
</repository>

次に、クライアントの依存関係を追加しましょう。

<dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>6.2.0</version>
</dependency>

4. リアルタイムデータ集約

このセクションでは、アプリケーションに必要なリアルタイムの集計を表すマテリアライズドビューを作成する方法を説明します。

4.1. ストリームの作成

Kafkaでは、トピックにイベントのコレクションが格納されます。 同様に、ksqkDBのでは、ストリームはKafkaトピックに裏打ちされたイベントを表します。

着信センサーデータを保存するストリームを作成することから始めましょう。

CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)
  WITH (KAFKA_TOPIC = 'readings',
        VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'timestamp',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',
        PARTITIONS = 1);

ここで、ksqlDBは readings トピックを作成して、ストリームデータをJSON形式で保存します。 イベントは時間データを表すため、各読み取り値にイベント時刻を示すタイムスタンプが含まれていることが重要です。 timestamp フィールドは、このデータを指定された形式で格納します。 これにより、ksqlDBは、時間関連の操作と順序が正しくないイベントにイベント時間セマンティクスを適用します。

次に、ksqlDBサーバー接続の詳細を使用して Client のインスタンスを作成し、これを使用してSQLステートメントを実行します。

ClientOptions options = ClientOptions.create()
  .setHost(KSQLDB_SERVER_HOST)
  .setPort(KSQLDB_SERVER_PORT);

Client client = Client.create(options);

Map<String, Object> properties = Collections.singletonMap(
  "auto.offset.reset", "earliest"
);

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_READINGS_STREAM, properties);

以前のCLIと同様に、auto.offset.resetプロパティの値を「earliest」に設定しました。 これにより、Kafkaオフセットがない場合に、クエリが最も早いオフセットから関連トピックを読み取ることが保証されます。

executeStatement メソッドは、クライアントによって提供される非同期APIの一部です。 サーバーにリクエストを送信する前に、すぐにCompleteableFutureを返します。 次に、呼び出し元のコードは、ブロックして完了を待つ(getまたはjoinメソッドを呼び出す)か、他の非ブロック操作を実行するかを決定できます。

4.2. マテリアライズド・ビューの作成

基礎となるイベントストリームができたので、readingsストリームから新しいalertsテーブルを派生させることができます。 この永続クエリ(またはマテリアライズドビュー)はサーバー上で無期限に実行され、ソースストリームまたはテーブルからのイベントを処理します。

この場合、センサーごとの平均読み取り値が30分間で25の値を超えると、アラートが発生するはずです。

CREATE TABLE alerts AS
  SELECT
    sensor_id,
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS start_period,
    TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS end_period,
    AVG(reading) AS average_reading
  FROM readings
  WINDOW TUMBLING (SIZE 30 MINUTES)
  GROUP BY id 
  HAVING AVG(reading) > 25
  EMIT CHANGES;

このクエリでは、センサーごとに30分のタンブリングウィンドウに新しい着信イベントを集約しています。 また、 TIMESTAMPTOSTRING 関数を使用して、UNIXタイムスタンプをより読みやすいものに変換しました。

重要なことに、マテリアライズド・ビューは、新しいイベントが集計関数と正常に統合された場合にのみデータで更新されます。

前と同じように、クライアントを使用してこのステートメントを非同期で実行し、マテリアライズドビューを作成しましょう。

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_ALERTS_TABLE, properties)

作成されると、そのようなビューは段階的に更新されます。 これは、リアルタイム更新のための効率的でパフォーマンスの高いクエリの鍵です。

4.3. サンプルデータの挿入

クエリを実行する前に、10分間隔でさまざまな読み取り値を表すサンプルイベントをいくつか作成しましょう。

KsqlObject を使用して、ストリーム列のキー/値マッピングを提供しましょう。

List<KsqlObject> rows = Arrays.asList(
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
  new KsqlObject().put("sensor_id", "sensor-2")
    .put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
  
  // additional rows
);

CompletableFuture<Void> result = CompletableFuture.allOf(
  rows.stream()
    .map(row -> client.insertInto(READINGS_TABLE, row))
    .toArray(CompletableFuture[]::new)
);

ここでは、便宜上、すべての個別の挿入操作を1つのFutureに結合します。 これは、基盤となるすべてのCompletableFutureインスタンスが正常に完了すると完了します。

5. データのクエリ

クエリを使用すると、呼び出し元はマテリアライズドビューデータをアプリケーションに取り込むことができます。 これらは2つのタイプに分類できます。

5.1. プッシュクエリ

このタイプのクエリは、更新の継続的なストリームをクライアントにプッシュします。 これらのクエリは、クライアントがリアルタイムで新しい情報に反応できるようにするため、特に非同期アプリケーションフローに適しています。

ただし、永続クエリとは異なり、サーバーはそのようなクエリの結果をKafkaトピックに保存しません。 したがって、これらのクエリを可能な限り単純に保ちながら、すべての面倒な作業を永続的なクエリに移す必要があります。

以前に作成したalertsマテリアライズドビューの結果をサブスクライブする簡単なプッシュクエリを作成しましょう。

SELECT * FROM alerts EMIT CHANGES;

ここで、 EMIT 句に注意することが重要です。この句は、すべての変更をクライアントに送信します。 クエリには制限がないため、終了するまですべての結果をストリーミングし続けます。

次に、ストリーミングデータを受信するために、クエリの結果をサブスクライブします。

public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
    return client.streamQuery(ALERTS_QUERY, PROPERTIES)
      .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
      .whenComplete((result, ex) -> {
          if (ex != null) {
              log.error("Alerts push query failed", ex);
          }
      });
}

ここでは、 streamQuery メソッドを呼び出しました。このメソッドは、ストリーミングデータを取得するためのStreamedQueryResultを返します。 これにより、PublisherインターフェースがReactiveStreamsから拡張されます。 したがって、リアクティブサブスクライバーを使用することで、結果を非同期で消費することができます。 実際、サブスクライバーは単純な Reactive Streams実装であり、ksqlDB行をJSONとして受け取り、それらを AlertPOJOに変換します。

これで、ComposeファイルとTestcontainersDockerComposeContainerを使用してこれをテストできます。

@Testcontainers
class KsqlDBApplicationLiveTest {

    @Container
    public static DockerComposeContainer dockerComposeContainer =
      new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
        .withServices("zookeeper", "broker", "ksqldb-server")
        .withExposedService("ksqldb-server", 8088,
          Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
        .withLocalCompose(true);

    // setup and teardown

    @Test
    void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
        createAlertsMaterializedView();
        
        // Reactive Streams Subscriber impl for receiving streaming data
        RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);

        ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
        insertSampleData();

        await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
          assertThat(alertSubscriber.consumedItems)
            .containsOnly(
              expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
              expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
            )
        );
    }
}

ここでは、統合テスト用の完全なksqlDB環境を作成しました。 テストはサンプル行をストリームに挿入し、ksqlDBはウィンドウ化された集計を実行します。 最後に、サブスクライバーが予想どおりに最新のアラートを消費していることを確認します。

5.2. プルクエリ

プッシュクエリとは対照的に、プルクエリは、従来のRDBMSのように、動的に更新されないデータを取得します。 このようなクエリは、有限の結果セットですぐに返されます。 したがって、プルクエリは、同期要求/応答アプリケーションフローに適しています。

簡単な例として、特定のセンサーIDに対してトリガーされたすべてのアラートを取得するクエリを作成しましょう。

String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";

List<Row> rows = client.executeQuery(pullQuery, PROPERTIES).get()

プッシュクエリとは対照的に、このクエリは実行時にマテリアライズドビューから利用可能なすべてのデータを返します。 これは、マテリアライズド・ビューの現在の状態を照会する場合に役立ちます。

5.3. その他の操作

クライアントのAPIドキュメントは、ソースの説明など、他の操作に関する詳細情報を提供します。 ストリーム、テーブル、トピックの一覧表示。 クエリの終了など。

6. 結論

この記事では、効率的なイベントストリーミングデータベースとしてksqlDBをサポートするストリーム、テーブル、およびクエリのコアコンセプトについて説明しました。

その過程で、簡潔で構成可能なSQL構造を使用して、シンプルでリアクティブなアプリケーションを構築しました。 また、Javaクライアントを使用してストリームとテーブルを作成し、マテリアライズドビューに対してクエリを発行してリアルタイムデータを取得する方法についても説明しました。

いつものように、完全なソースコードはGitHubから入手できます。