1. 序章

今日のアプリケーションでは、レプリカデータベース、検索操作を実行するための検索インデックス、データ読み取りを高速化するためのキャッシュストア、およびデータの複雑な分析のためのデータウェアハウスが必要になる場合があります。

さまざまなデータモデルとデータアクセスパターンをサポートする必要があるため、ほとんどのソフトウェアWeb開発者が解決する必要のある一般的な問題が発生します。そのとき、Change Data Capture(CDC)が役に立ちます。

この記事では、CDCの概要から始め、CDCで一般的に使用されるプラットフォームであるDebeziumに焦点を当てます。

2. CDCとは何ですか?

このセクションでは、CDCとは何か、CDCを使用する主な利点、およびいくつかの一般的な使用例について説明します。

2.1. データキャプチャの変更

Change Data Capture(CDC)は、手法でありデザインパターンです。 多くの場合、データベース間でリアルタイムにデータを複製するために使用します。

ソースデータベースに書き込まれたデータの変更を追跡し、ターゲットデータベースを自動的に同期することもできます。 CDCは増分ロードを可能にし、一括ロード更新の必要性を排除します

2.2. CDCの利点

今日でもほとんどの企業は、バッチ処理を使用してシステム間でデータを同期しています。 バッチ処理の使用:

  • データはすぐには同期されません
  • より多くの割り当てられたリソースがデータベースの同期に使用されます
  • データ複製は、指定されたバッチ期間中にのみ発生します

ただし、変更データのキャプチャにはいくつかの利点があります。

  • ソースデータベースの変更を常に追跡します
  • ターゲットデータベースを即座に更新します
  • ストリーム処理を使用して、即時の変更を保証します

CDCを使用すると、さまざまなデータベースが継続的に同期され、一括選択は過去のものとなります。 さらに、CDCは増分変更のみを転送するため、データ転送のコストが削減されます

2.3. 一般的なCDCのユースケース

さまざまなデータソースの同期を維持することによるデータレプリケーション、キャッシュの更新または無効化、検索インデックスの更新、マイクロサービスでのデータ同期など、CDCが解決に役立つさまざまなユースケースがあります。

CDCで何ができるかについて少し理解できたので、よく知られているオープンソースツールの1つにCDCがどのように実装されているかを見てみましょう。

3. デベジウムプラットフォーム

このセクションでは、 Debezium を紹介し、そのアーキテクチャを詳細に調べ、さまざまな展開方法を確認します。

3.1. デベジウムとは何ですか?

Debeziumは、 ApacheKafka上に構築されたCDCのオープンソースプラットフォームです。 その主な用途は、各ソースデータベーステーブルにコミットされたすべての行レベルの変更をトランザクションログに記録することです。 これらのイベントをリッスンする各アプリケーションは、増分データ変更に基づいて必要なアクションを実行できます。

Debeziumは、MySQL、MongoDB、PostgreSQLなどの複数のデータベースをサポートするコネクタのライブラリを提供します。

これらのコネクタは、データベースの変更を監視および記録し、Kafkaなどのストリーミングサービスに公開できます。

さらに、 Debeziumは、アプリケーションがダウンしている場合でも監視します。 再起動すると、中断したところからイベントの消費が開始されるため、何も見逃しません。

3.2. Debeziumアーキテクチャ

Debeziumの導入は、使用しているインフラストラクチャによって異なりますが、より一般的には、ApacheKafkaConnectを使用することがよくあります。

Kafka Connectは、Kafkaブローカーとは別のサービスとして機能するフレームワークです。 ApacheKafkaと他のシステム間のデータのストリーミングに使用しました。

Kafkaとの間でデータを転送するためのコネクタを定義することもできます。

次の図は、Debeziumに基づく変更データキャプチャパイプラインのさまざまな部分を示しています。

まず、左側に、データをコピーしてPostgreSQLやその他の分析データベースなどのターゲットデータベースで使用するMySQLソースデータベースがあります。

次に、 Kafka Connectコネクタは、トランザクションログを解析および解釈し、Kafkaトピックに書き込みます。

次に、Kafkaはメッセージブローカーとして機能し、チェンジセットをターゲットシステムに確実に転送します。

次に、右側に、KafkaコネクタがKafkaをポーリングし、変更をターゲットデータベースにプッシュします。

DebeziumはアーキテクチャでKafkaを利用していますが、インフラストラクチャのニーズを満たすために他の展開方法も提供しています。

Debeziumサーバーでスタンドアロンサーバーとして使用することも、ライブラリとしてアプリケーションコードに埋め込むこともできます。

これらのメソッドについては、次のセクションで説明します。

3.3. Debeziumサーバー

Debeziumは、ソースデータベースの変更をキャプチャするためのスタンドアロンサーバーを提供します。 Debeziumソースコネクタの1つを使用するように構成されています。

さらに、これらのコネクタは、AmazonKinesisやGoogleCloud Pub/Subなどのさまざまなメッセージングインフラストラクチャに変更イベントを送信します。

3.4. 埋め込まれたデベジウム

Kafka Connectは、Debeziumの展開に使用される場合、フォールトトレランスとスケーラビリティを提供します。 ただし、アプリケーションがそのレベルの信頼性を必要としない場合があり、インフラストラクチャのコストを最小限に抑えたいと考えています。

ありがたいことに、アプリケーション内にDebeziumエンジンを埋め込むことでこれを行うことができます。 これを行った後、コネクタを構成する必要があります。

4. 設定

このセクションでは、最初にアプリケーションのアーキテクチャから始めます。 次に、環境をセットアップする方法を確認し、Debeziumを統合するためのいくつかの基本的な手順に従います。

まず、アプリケーションの紹介から始めましょう。

4.1. サンプルアプリケーションのアーキテクチャ

アプリケーションをシンプルにするために、顧客管理用のSpring Bootアプリケーションを作成します。

お客様のモデルには、 ID フルネーム、、およびメールフィールドがあります。 データアクセス層には、SpringデータJPAを使用します。

とりわけ、私たちのアプリケーションはDebeziumの組み込みバージョンを実行します。 このアプリケーションアーキテクチャを視覚化してみましょう。

まず、Debezium Engineは、(別のシステムまたはアプリケーションからの)ソースMySQLデータベース上のcustomerテーブルのトランザクションログを追跡します。

次に、 customerテーブルでInsert/Update / Deleteなどのデータベース操作を実行するたびに、Debeziumコネクタはサービスメソッドを呼び出します。

最後に、これらのイベントに基づいて、そのメソッドはc ustomer テーブルのデータをターゲットのMySQLデータベース(アプリケーションのプライマリデータベース)に同期します。

4.2. Mavenの依存関係

まず、必要な依存関係pom.xmlに追加することから始めましょう。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>1.4.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.4.2.Final</version>
</dependency>

同様に、アプリケーションが使用する各Debeziumコネクタに依存関係を追加します。

この例では、MySQLコネクタを使用します。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.4.2.Final</version>
</dependency>

4.3. データベースのインストール

データベースを手動でインストールおよび構成できます。 ただし、処理を高速化するために、docker-composeファイルを使用します。

version: "3.9"
services:
  # Install Source MySQL DB and setup the Customer database
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3305:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # Install Target MySQL DB and setup the Customer database
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

このファイルは、異なるポートで2つのデータベースインスタンスを実行します。

このファイルは、コマンド docker-compose up-dを使用して実行できます。

それでは、SQLスクリプトを実行してc ustomerテーブルを作成しましょう。

CREATE TABLE customer
(
    id integer NOT NULL,
    fullname character varying(255),
    email character varying(255),
    CONSTRAINT customer_pkey PRIMARY KEY (id)
);

5. 構成

このセクションでは、Debezium MySQLコネクタを構成し、EmbeddedDebeziumEngineを実行する方法を確認します。

5.1. Debeziumコネクタの構成

Debezium MySQLコネクタを構成するために、Debezium構成Beanを作成します。

@Bean
public io.debezium.config.Configuration customerConnector() {
    return io.debezium.config.Configuration.create()
        .with("name", "customer-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", customerDbHost)
        .with("database.port", customerDbPort)
        .with("database.user", customerDbUsername)
        .with("database.password", customerDbPassword)
        .with("database.dbname", customerDbName)
        .with("database.include.list", customerDbName)
        .with("include.schema.changes", "false")
        .with("database.server.id", "10181")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "/tmp/dbhistory.dat")
        .build();
}

この構成をさらに詳しく調べてみましょう。

このBean内のcreateメソッドは、ビルダーを使用してプロパティオブジェクトを作成します。

このビルダーは、優先コネクターに関係なく、engineに必要ないくつかのプロパティを設定します。 ソースMySQLデータベースを追跡するには、クラスMySqlConnectorを使用します。

このコネクタが実行されると、ソースからの変更の追跡が開始され、「オフセット」が記録されて、トランザクションログから処理されたデータの量が決定されます。

これらのオフセットを保存する方法はいくつかありますが、この例では、クラス FileOffsetBackingStore を使用して、ローカルファイルシステムにオフセットを保存します。

コネクタの最後のいくつかのパラメータは、MySQLデータベースのプロパティです。

構成ができたので、エンジンを作成できます。

5.2. Debeziumエンジンの実行

DebeziumEngine は、MySQLコネクタのラッパーとして機能します。 コネクタ構成を使用してエンジンを作成しましょう。

private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
      .using(customerConnectorConfiguration.asProperties())
      .notifying(this::handleEvent)
      .build();

    this.customerService = customerService;
}

さらに、エンジンはデータが変更されるたびにメソッドを呼び出します。この例では、handleChangeEventです。

このメソッドでは、最初に、 create()。を呼び出すときに、指定された形式に基づいてすべてのイベントを解析します。

次に、実行した操作を見つけ、 CustomerService を呼び出して、ターゲットデータベースで作成/更新/削除機能を実行します。

private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
    Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

    if (sourceRecordChangeValue != null) {
        Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

        if(operation != Operation.READ) {
            String record = operation == Operation.DELETE ? BEFORE : AFTER;
            Struct struct = (Struct) sourceRecordChangeValue.get(record);
            Map<String, Object> payload = struct.schema().fields().stream()
              .map(Field::name)
              .filter(fieldName -> struct.get(fieldName) != null)
              .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
              .collect(toMap(Pair::getKey, Pair::getValue));

            this.customerService.replicateData(payload, operation);
        }
    }
}

DebeziumEngine オブジェクトを構成したので、サービスエグゼキューターを使用して非同期で開始しましょう。

private final Executor executor = Executors.newSingleThreadExecutor();

@PostConstruct
private void start() {
    this.executor.execute(debeziumEngine);
}

@PreDestroy
private void stop() throws IOException {
    if (this.debeziumEngine != null) {
        this.debeziumEngine.close();
    }
}

6. 動作中のデベジウム

コードの動作を確認するために、ソースデータベースのcustomerテーブルでデータを変更してみましょう。

6.1. レコードの挿入

customer テーブルに新しいレコードを追加するには、MySQLシェルに移動して次のコマンドを実行します。

INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', '[email protected]')

このクエリを実行すると、アプリケーションからの対応する出力が表示されます。

23:57:57.897 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: CREATE

最後に、新しいレコードがターゲットデータベースに挿入されたことを確認します。

id  fullname   email
1  John Doe   [email protected]

6.2. レコードの更新

それでは、最後に挿入された顧客を更新して、何が起こるかを確認してみましょう。

UPDATE customerdb.customer t SET t.email = '[email protected]' WHERE t.id = 1

その後、操作タイプが「UPDATE」に変更されることを除いて、insertで取得したのと同じ出力が取得されます。もちろん、Hibernateが使用するクエリは「update」クエリです。

00:08:57.893 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},after=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: UPDATE

ジョンの電子メールがターゲットデータベースで変更されたことを確認できます。

id  fullname   email
1  John Doe   [email protected]

6.3. レコードの削除

これで、次を実行してcustomerテーブルのエントリを削除できます。

DELETE FROM customerdb.customer WHERE id = 1

同様に、ここでは操作とクエリが変更されています。

00:12:16.892 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,[email protected]},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, [email protected]} with Operation: DELETE

ターゲットデータベースでデータが削除されたことを確認できます。

select * from customerdb.customer where id= 1
0 rows retrieved

7. 結論

この記事では、CDCの利点とCDCが解決できる問題について説明しました。 また、それがないと、データの一括読み込みが必要になり、時間とコストの両方がかかることもわかりました。

また、CDCのユースケースを簡単に解決するのに役立つ優れたオープンソースプラットフォームであるDebeziumも見ました。

いつものように、記事の完全なソースコードは、GitHubから入手できます。