開発者ドキュメント

Debian9でApacheKafkaデータをバックアップ、インポート、および移行する方法

著者は、 Write for DOnations プログラムの一環として、 Tech EducationFundを選択して寄付を受け取りました。

序章

Apache Kafka データのバックアップは、ユーザーエラーによってクラスターに追加された意図しないデータ損失や不良データから回復するのに役立つ重要なプラクティスです。 クラスタおよびトピックデータのデータダンプは、バックアップと復元を実行するための効率的な方法です。

バックアップされたデータを別のサーバーにインポートして移行すると、サーバーのハードウェアまたはネットワークの障害が原因でKafkaインスタンスが使用できなくなり、古いデータを使用して新しいKafkaインスタンスを作成する必要がある場合に役立ちます。 バックアップされたデータのインポートと移行は、リソース使用量の変更のためにKafkaインスタンスをアップグレードまたはダウングレードされたサーバーに移動する場合にも役立ちます。

このチュートリアルでは、単一のDebian 9インストールと、別々のサーバー上の複数のDebian 9インストールで、Kafkaデータをバックアップ、インポート、および移行します。 ZooKeeperは、Kafkaの運用の重要なコンポーネントです。 コンシューマーデータ、パーティションデータ、クラスター内の他のブローカーの状態など、クラスターの状態に関する情報を格納します。 そのため、このチュートリアルではZooKeeperのデータもバックアップします。

前提条件

フォローするには、次のものが必要です。

ステップ1—テストトピックの作成とメッセージの追加

Kafka メッセージは、Kafkaのデータストレージの最も基本的な単位であり、Kafkaに公開およびKafkaからサブスクライブするエンティティです。 Kafka topic は、関連するメッセージのグループのコンテナーのようなものです。 特定のトピックをサブスクライブすると、その特定のトピックに公開されたメッセージのみが受信されます。 このセクションでは、バックアップするサーバー(ソースサーバー)にログインし、Kafkaトピックとメッセージを追加して、バックアップ用にデータを入力します。

このチュートリアルでは、 kafka ユーザー(/home/kafka/kafka)のホームディレクトリにKafkaがインストールされていることを前提としています。 インストールが別のディレクトリにある場合は、次のコマンドの~/kafkaの部分を、Kafkaインストールのパスを使用して変更し、このチュートリアルの残りの部分のコマンドを変更します。

次のコマンドを実行して、ソースサーバーにSSHで接続します。

  1. ssh sammy@source_server_ip

次のコマンドを実行して、kafkaユーザーとしてログインします。

  1. sudo -iu kafka

次のように入力して、Kafkaインストールのbinディレクトリにあるkafka-topics.shシェルユーティリティファイルを使用して、BackupTopicという名前のトピックを作成します。

  1. ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic

~/kafka/bin/kafka-console-producer.shシェルユーティリティスクリプトを使用して、文字列"Test Message 1"BackupTopicトピックに公開します。

ここにメッセージを追加したい場合は、ここで追加できます。

  1. echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null

~/kafka/bin/kafka-console-producer.shファイルを使用すると、コマンドラインから直接メッセージを公開できます。 通常、プログラム内からKafkaクライアントライブラリを使用してメッセージを公開しますが、プログラミング言語ごとに異なる設定が必要になるため、テスト中または管理タスクの実行中に、言語に依存しない方法としてシェルスクリプトを使用できます。 --topicフラグは、メッセージを公開するトピックを指定します。

次に、次のコマンドを実行して、kafka-console-producer.shスクリプトがメッセージを公開したことを確認します。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

~/kafka/bin/kafka-console-consumer.shシェルスクリプトがコンシューマーを起動します。 開始すると、前のコマンドの"Test Message 1"メッセージで公開したトピックからのメッセージをサブスクライブします。 コマンドの--from-beginningフラグを使用すると、コンシューマーが開始される前に公開されたメッセージを消費できます。 フラグを有効にしない場合、コンシューマーの開始後に公開されたメッセージのみが表示されます。 コマンドを実行すると、ターミナルに次の出力が表示されます。

Output
Test Message 1

CTRL+Cを押して、コンシューマーを停止します。

いくつかのテストデータを作成し、それが永続化されていることを確認しました。 これで、次のセクションで状態データをバックアップできます。

ステップ2—ZooKeeperの状態データをバックアップする

実際のKafkaデータをバックアップする前に、ZooKeeperに保存されているクラスターの状態をバックアップする必要があります。

ZooKeeperは、~/kafka/config/zookeeper.properties構成ファイルのdataDirフィールドで指定されたディレクトリにデータを保存します。 バックアップするディレクトリを決定するには、このフィールドの値を読み取る必要があります。 デフォルトでは、dataDir/tmp/zookeeperディレクトリを指します。 インストールで値が異なる場合は、次のコマンドで/tmp/zookeeperをその値に置き換えます。

~/kafka/config/zookeeper.propertiesファイルの出力例を次に示します。

〜/ kafka / config / zookeeper.properties
...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
...
...
...

ディレクトリへのパスがわかったので、その内容の圧縮アーカイブファイルを作成できます。 圧縮されたアーカイブファイルは、ディスク容量を節約するために通常のアーカイブファイルよりも優れたオプションです。 次のコマンドを実行します。

  1. tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*

コマンドの出力tar: Removing leading / from member namesは無視しても問題ありません。

-cおよび-zフラグは、tarにアーカイブを作成し、アーカイブにgzip圧縮を適用するように指示します。 -fフラグは、出力圧縮アーカイブファイルの名前を指定します。この場合はzookeeper-backup.tar.gzです。

現在のディレクトリでlsを実行すると、出力の一部としてzookeeper-backup.tar.gzが表示されます。

これで、ZooKeeperデータのバックアップに成功しました。 次のセクションでは、実際のKafkaデータをバックアップします。

ステップ3—Kafkaのトピックとメッセージをバックアップする

このセクションでは、前の手順でZooKeeperに対して行ったように、Kafkaのデータディレクトリを圧縮されたtarファイルにバックアップします。

Kafkaは、トピック、メッセージ、および内部ファイルを、log.dirsフィールドが~/kafka/config/server.properties構成ファイルで指定するディレクトリに保存します。 バックアップするディレクトリを決定するには、このフィールドの値を読み取る必要があります。 デフォルトおよび現在のインストールでは、log.dirs/tmp/kafka-logsディレクトリを指します。 インストールで値が異なる場合は、次のコマンドの/tmp/kafka-logsを正しい値に置き換えてください。

~/kafka/config/server.propertiesファイルの出力例を次に示します。

〜/ kafka / config / server.properties
...
...
...
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
...
...
...

まず、tarでアーカイブを作成するときに、log.dirsディレクトリ内のデータが一貫した状態になるように、Kafkaサービスを停止します。 これを行うには、exitと入力してサーバーの非ルートユーザーに戻り、次のコマンドを実行します。

  1. sudo systemctl stop kafka

Kafkaサービスを停止した後、kafkaユーザーとして次のコマンドで再度ログインします。

  1. sudo -iu kafka

Apache Kafkaのインストール前提条件で、セキュリティ上の予防措置として kafka ユーザーを制限したため、root以外のsudoユーザーとしてKafkaおよびZooKeeperサービスを停止/開始する必要があります。 前提条件のこの手順により、 kafka ユーザーのsudoアクセスが無効になり、コマンドの実行に失敗します。

次に、次のコマンドを実行して、ディレクトリの内容の圧縮アーカイブファイルを作成します。

  1. tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*

繰り返しになりますが、コマンドの出力(tar: Removing leading / from member names)は無視しても問題ありません。

現在のディレクトリでlsを実行すると、出力の一部としてkafka-backup.tar.gzが表示されます。

exitと入力し、root以外のsudoユーザーに切り替えてから、次のコマンドを実行することで、データをすぐに復元したくない場合は、Kafkaサービスを再開できます。

  1. sudo systemctl start kafka

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

これで、Kafkaデータのバックアップに成功しました。 これで、ZooKeeperに保存されているクラスター状態データを復元する次のセクションに進むことができます。

ステップ4—ZooKeeperデータの復元

このセクションでは、ユーザーがトピックの作成、追加ノードの追加/削除、メッセージの追加と消費などの操作を実行するときに、Kafkaが内部で作成および管理するクラスター状態データを復元します。 ZooKeeperデータディレクトリを削除し、zookeeper-backup.tar.gzファイルの内容を復元することにより、データを既存のソースインストールに復元します。 別のサーバーにデータを復元する場合は、手順7を参照してください。

復元プロセス中に無効なデータを受信するデータディレクトリに対する予防策として、KafkaおよびZooKeeperサービスを停止する必要があります。

まず、exitと入力して、Kafkaサービスを停止し、root以外のsudoユーザーに切り替えてから、次のコマンドを実行します。

  1. sudo systemctl stop kafka

次に、ZooKeeperサービスを停止します。

  1. sudo systemctl stop zookeeper

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

次に、次のコマンドを使用して、既存のクラスターデータディレクトリを安全に削除できます。

  1. rm -r /tmp/zookeeper/*

次に、手順2でバックアップしたデータを復元します。

  1. tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2

-Cフラグは、データを抽出する前にtarにディレクトリ/tmp/zookeeperに移動するように指示します。 --strip 2フラグを指定して、tarがアーカイブの内容を/tmp/zookeeper/自体に抽出し、アーカイブ内の別のディレクトリ(/tmp/zookeeper/tmp/zookeeper/など)には抽出しないようにします。

これで、クラスター状態データが正常に復元されました。 これで、次のセクションのKafkaデータ復元プロセスに進むことができます。

ステップ5—Kafkaデータの復元

このセクションでは、Kafkaデータディレクトリを削除して圧縮アーカイブファイルを復元することにより、バックアップされたKafkaデータを既存のソースインストール(またはオプションの手順7に従った場合は宛先サーバー)に復元します。 これにより、復元が正常に機能することを確認できます。

次のコマンドを使用して、既存のKafkaデータディレクトリを安全に削除できます。

  1. rm -r /tmp/kafka-logs/*

データを削除したので、Kafkaのインストールは、トピックやメッセージが含まれていない新しいインストールに似ています。 バックアップしたデータを復元するには、次のコマンドを実行してファイルを抽出します。

  1. tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2

-Cフラグは、データを抽出する前にtarにディレクトリ/tmp/kafka-logsに移動するように指示します。 --strip 2フラグを指定して、アーカイブの内容が/tmp/kafka-logs/自体に抽出され、アーカイブ内の別のディレクトリ(/tmp/kafka-logs/kafka-logs/など)には抽出されないようにします。

データが正常に抽出されたので、exitと入力して、root以外のsudoユーザーに切り替えてから、次のコマンドを実行することで、KafkaおよびZooKeeperサービスを再開できます。

  1. sudo systemctl start kafka

次のコマンドでZooKeeperサービスを開始します。

  1. sudo systemctl start zookeeper

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

kafkaデータを復元しました。次のセクションで、復元が成功したことの確認に進むことができます。

ステップ6—復元の確認

Kafkaデータの復元をテストするには、手順1で作成したトピックからのメッセージを使用します。

Kafkaが起動するまで数分待ってから、次のコマンドを実行してBackupTopicからのメッセージを読み取ります。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

次のような警告が表示された場合は、Kafkaが完全に起動するのを待つ必要があります。

Output
[2018-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

さらに数分後に前のコマンドを再試行するか、root以外のsudoユーザーとしてsudo systemctl restart kafkaを実行してください。 復元に問題がない場合は、次の出力が表示されます。

Output
Test Message 1

このメッセージが表示されない場合は、前のセクションのコマンドを見逃していないかどうかを確認して実行できます。

復元されたKafkaデータを確認したので、これは、単一のKafkaインストールでデータを正常にバックアップおよび復元したことを意味します。 手順7に進んで、クラスターとトピックのデータを別のサーバーのインストールに移行する方法を確認できます。

ステップ7—バックアップを別のKafkaサーバーに移行および復元する(オプション)

このセクションでは、バックアップされたデータをソースのKafkaサーバーから宛先のKafkaサーバーに移行します。 これを行うには、最初にscpコマンドを使用して、圧縮されたtar.gzファイルをローカルシステムにダウンロードします。 次に、scpを再度使用して、ファイルを宛先サーバーにプッシュします。 ファイルが移行先サーバーに存在するようになったら、前に使用した手順に従ってバックアップを復元し、移行が成功したことを確認できます。

バックアップファイルをローカルにダウンロードしてから、ソースサーバーから宛先サーバーに直接コピーするのではなく、宛先サーバーにアップロードします。これは、宛先サーバーの/home/sammy/.ssh/authorized_keysファイルにソースサーバーのSSHキーが含まれていないためです。ソースサーバーとの間で接続できません。 ただし、ローカルマシンは両方のサーバーに接続できるため、送信元サーバーから宛先サーバーへのSSHアクセスを設定する追加の手順を省くことができます。

次のコマンドを実行して、zookeeper-backup.tar.gzおよびkafka-backup.tar.gzファイルをローカルマシンにダウンロードします。

  1. scp sammy@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .

次のような出力が表示されます。

Output
zookeeper-backup.tar.gz 100% 68KB 128.0KB/s 00:00

次に、次のコマンドを実行して、kafka-backup.tar.gzファイルをローカルマシンにダウンロードします。

  1. scp sammy@source_server_ip:/home/kafka/kafka-backup.tar.gz .

次の出力が表示されます。

Output
kafka-backup.tar.gz 100% 1031KB 488.3KB/s 00:02

ローカルマシンの現在のディレクトリでlsを実行すると、両方のファイルが表示されます。

Output
kafka-backup.tar.gz zookeeper.tar.gz

次のコマンドを実行して、zookeeper-backup.tar.gzファイルを宛先サーバーの/home/kafka/に転送します。

  1. scp zookeeper-backup.tar.gz sammy@destination_server_ip:/home/sammy/zookeeper-backup.tar.gz

次に、次のコマンドを実行して、kafka-backup.tar.gzファイルを宛先サーバーの/home/kafka/に転送します。

  1. scp kafka-backup.tar.gz sammy@destination_server_ip:/home/sammy/kafka-backup.tar.gz

バックアップファイルが宛先サーバーに正常にアップロードされました。 ファイルは/home/sammy/ディレクトリにあり、 kafka ユーザーがアクセスするための適切な権限がないため、ファイルを/home/kafka/ディレクトリに移動して変更できます。権限。

次のコマンドを実行して、宛先サーバーにSSHで接続します。

  1. ssh sammy@destination_server_ip

次に、以下を実行して、zookeeper-backup.tar.gz/home/kafka/に移動します。

  1. sudo mv zookeeper-backup.tar.gz /home/sammy/zookeeper-backup.tar.gz

同様に、次のコマンドを実行して、kafka-backup.tar.gz/home/kafka/にコピーします。

  1. sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz

次のコマンドを実行して、バックアップファイルの所有者を変更します。

  1. sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz

以前のmvおよびchownコマンドは出力を表示しません。

バックアップファイルが宛先サーバーの正しいディレクトリにあるので、このチュートリアルのステップ4から6にリストされているコマンドに従って、宛先サーバーのデータを復元および確認します。

結論

このチュートリアルでは、Kafkaのトピックとメッセージを、同じインストールと別々のサーバーへのインストールの両方からバックアップ、インポート、および移行しました。 Kafkaの他の便利な管理タスクについて詳しく知りたい場合は、Kafkaの公式ドキュメントの操作セクションを参照してください。

zookeeper-backup.tar.gzkafka-backup.tar.gzなどのバックアップファイルをリモートで保存するには、 DigitalOceanSpacesを探索できます。 Kafkaがサーバーで実行されている唯一のサービスである場合は、フルインスタンスバックアップなどの他のバックアップ方法を調べることもできます。

モバイルバージョンを終了