序章

Apache Kafka データのバックアップは、ユーザーエラーによってクラスターに追加された意図しないデータ損失や不良データから回復するのに役立つ重要な方法です。 クラスタおよびトピックデータのデータダンプは、バックアップと復元を実行するための効率的な方法です。

バックアップしたデータを別のサーバーにインポートして移行すると、サーバーのハードウェアやネットワークの障害が原因でKafkaインスタンスが使用できなくなり、古いデータを使用して新しいKafkaインスタンスを作成する必要がある場合に役立ちます。 バックアップされたデータのインポートと移行は、リソース使用量の変更のためにKafkaインスタンスをアップグレードまたはダウングレードされたサーバーに移動する場合にも役立ちます。

このチュートリアルでは、単一のUbuntu 18.04インストールと、別々のサーバー上の複数のUbuntu 18.04インストールで、Kafkaデータをバックアップ、インポート、および移行します。 ZooKeeperは、Kafkaの運用の重要なコンポーネントです。 コンシューマーデータ、パーティションデータ、クラスター内の他のブローカーの状態など、クラスターの状態に関する情報を格納します。 そのため、このチュートリアルではZooKeeperのデータもバックアップします。

前提条件

フォローするには、次のものが必要です。

  • チュートリアルに従ってセットアップされた、少なくとも4GBのRAMと非rootsudoユーザーを備えたUbuntu18.04サーバー。

  • バックアップのソースとして機能する、ApacheKafkaがインストールされたUbuntu18.04サーバー。 Ubuntu 18.04にApacheKafkaをインストールする方法ガイドに従って、Kafkaがソースサーバーにまだインストールされていない場合は、Kafkaのインストールをセットアップします。

  • OpenJDK8がサーバーにインストールされています。 このバージョンをインストールするには、OpenJDKの特定のバージョンをインストールする際にこれらの指示に従ってください。

  • ステップ7のオプション—バックアップの宛先として機能するApacheKafkaがインストールされた別のUbuntu18.04サーバー。 前の前提条件の記事のリンクに従って、移行先サーバーにKafkaをインストールします。 この前提条件は、Kafkaデータをあるサーバーから別のサーバーに移動する場合にのみ必要です。 Kafkaデータをバックアップして単一のサーバーにインポートする場合は、この前提条件をスキップできます。

ステップ1—テストトピックの作成とメッセージの追加

Kafka メッセージは、Kafkaのデータストレージの最も基本的な単位であり、Kafkaに公開およびKafkaからサブスクライブするエンティティです。 Kafka topic は、関連するメッセージのグループのコンテナーのようなものです。 特定のトピックをサブスクライブすると、その特定のトピックに公開されたメッセージのみが受信されます。 このセクションでは、バックアップするサーバー(ソースサーバー)にログインし、Kafkaトピックとメッセージを追加して、バックアップ用にデータを入力します。

このチュートリアルでは、 kafka ユーザーのホームディレクトリにKafkaがインストールされていることを前提としています(/home/kafka/kafka). インストールが別のディレクトリにある場合は、 ~/kafka Kafkaインストールのパスを使用して次のコマンドに参加し、このチュートリアルの残りの部分でコマンドを実行します。

次のコマンドを実行して、ソースサーバーにSSHで接続します。

  1. ssh sammy@source_server_ip

次のコマンドを実行して、kafkaユーザーとしてログインします。

  1. sudo -iu kafka

名前の付いたトピックを作成します BackupTopic を使用して kafka-topics.sh 次のように入力して、Kafkaインストールのbinディレクトリにあるシェルユーティリティファイル。

  1. ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic

文字列を公開する "Test Message 1"BackupTopic を使用してトピック ~/kafka/bin/kafka-console-producer.sh シェルユーティリティスクリプト。

ここにメッセージを追加したい場合は、ここで追加できます。

  1. echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null

The ~/kafka/bin/kafka-console-producer.sh fileを使用すると、コマンドラインから直接メッセージを公開できます。 通常、プログラム内からKafkaクライアントライブラリを使用してメッセージを公開しますが、プログラミング言語ごとに異なる設定が必要になるため、テスト中または管理タスクの実行中に、言語に依存しない方法としてシェルスクリプトを使用できます。 The --topic flagは、メッセージを公開するトピックを指定します。

次に、 kafka-console-producer.sh スクリプトは、次のコマンドを実行してメッセージを公開しました。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

The ~/kafka/bin/kafka-console-consumer.sh シェルスクリプトがコンシューマーを起動します。 開始すると、で公開したトピックからのメッセージをサブスクライブします "Test Message 1" 前のコマンドのメッセージ。 The --from-beginning コマンドのフラグを使用すると、コンシューマーが開始される前に発行されたメッセージを消費できます。 フラグを有効にしない場合、コンシューマーの開始後に公開されたメッセージのみが表示されます。 コマンドを実行すると、ターミナルに次の出力が表示されます。

Output
Test Message 1

プレス CTRL+C 消費者を止めるために。

いくつかのテストデータを作成し、それが永続化されていることを確認しました。 これで、次のセクションで状態データをバックアップできます。

ステップ2—ZooKeeperの状態データをバックアップする

実際のKafkaデータをバックアップする前に、ZooKeeperに保存されているクラスターの状態をバックアップする必要があります。

ZooKeeperは、そのデータを指定されたディレクトリに保存します。 dataDir のフィールド ~/kafka/config/zookeeper.properties 構成ファイル。 バックアップするディレクトリを決定するには、このフィールドの値を読み取る必要があります。 デフォルトでは、 dataDir を指す /tmp/zookeeper ディレクトリ。 インストールで値が異なる場合は、 /tmp/zookeeper 次のコマンドでその値を使用します。

これが出力例です ~/kafka/config/zookeeper.properties ファイル:

〜/ kafka / config / zookeeper.properties
...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
...
...
...

ディレクトリへのパスがわかったので、その内容の圧縮アーカイブファイルを作成できます。 圧縮されたアーカイブファイルは、ディスク領域を節約するために通常のアーカイブファイルよりも優れたオプションです。 次のコマンドを実行します。

  1. tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*

コマンドの出力 tar: Removing leading / from member names 無視しても問題ありません。

The -c-z フラグが伝える tar アーカイブを作成し、アーカイブにgzip圧縮を適用します。 The -f flagは、出力圧縮アーカイブファイルの名前を指定します。 zookeeper-backup.tar.gz この場合。

あなたが実行することができます ls 現在のディレクトリで確認してください zookeeper-backup.tar.gz 出力の一部として。

これで、ZooKeeperデータのバックアップに成功しました。 次のセクションでは、実際のKafkaデータをバックアップします。

ステップ3—Kafkaのトピックとメッセージをバックアップする

このセクションでは、前の手順でZooKeeperに対して行ったように、Kafkaのデータディレクトリを圧縮されたtarファイルにバックアップします。

Kafkaは、トピック、メッセージ、および内部ファイルを、 log.dirs フィールドはで指定します ~/kafka/config/server.properties 構成ファイル。 バックアップするディレクトリを決定するには、このフィールドの値を読み取る必要があります。 デフォルトおよび現在のインストールでは、 log.dirs を指す /tmp/kafka-logs ディレクトリ。 インストールで値が異なる場合は、 /tmp/kafka-logs 次のコマンドで正しい値を使用してください。

これが出力例です ~/kafka/config/server.properties ファイル:

〜/ kafka / config / server.properties
...
...
...
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
...
...
...

まず、Kafkaサービスを停止して、 log.dirs でアーカイブを作成するとき、ディレクトリは一貫した状態にあります tar. これを行うには、次のように入力して、サーバーのroot以外のユーザーに戻ります。 exit 次に、次のコマンドを実行します。

  1. sudo systemctl stop kafka

Kafkaサービスを停止した後、kafkaユーザーとして次のコマンドで再度ログインします。

  1. sudo -iu kafka

Apache Kafkaのインストール前提条件で、セキュリティ上の予防措置として kafka ユーザーを制限したため、root以外のsudoユーザーとしてKafkaおよびZooKeeperサービスを停止/開始する必要があります。 前提条件のこの手順により、 kafka ユーザーのsudoアクセスが無効になり、コマンドの実行に失敗します。

次に、次のコマンドを実行して、ディレクトリの内容の圧縮アーカイブファイルを作成します。

  1. tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*

繰り返しになりますが、コマンドの出力を安全に無視できます(tar: Removing leading / from member names).

あなたが実行することができます ls 現在のディレクトリで確認する kafka-backup.tar.gz 出力の一部として。

データをすぐに復元したくない場合は、次のように入力して、Kafkaサービスを再開できます。 exit、root以外のsudoユーザーに切り替えて、次のコマンドを実行します。

  1. sudo systemctl start kafka

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

これで、Kafkaデータが正常にバックアップされました。 これで、ZooKeeperに保存されているクラスター状態データを復元する次のセクションに進むことができます。

ステップ4—ZooKeeperデータの復元

このセクションでは、ユーザーがトピックの作成、追加ノードの追加/削除、メッセージの追加と消費などの操作を実行するときに、Kafkaが内部で作成および管理するクラスター状態データを復元します。 ZooKeeperデータディレクトリを削除し、その内容を復元することにより、既存のソースインストールにデータを復元します。 zookeeper-backup.tar.gz ファイル。 別のサーバーにデータを復元する場合は、手順7を参照してください。

復元プロセス中に無効なデータを受信するデータディレクトリに対する予防策として、KafkaおよびZooKeeperサービスを停止する必要があります。

まず、次のように入力してKafkaサービスを停止します exit、root以外のsudoユーザーに切り替えて、次のコマンドを実行します。

  1. sudo systemctl stop kafka

次に、ZooKeeperサービスを停止します。

  1. sudo systemctl stop zookeeper

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

次に、次のコマンドを使用して、既存のクラスターデータディレクトリを安全に削除できます。

  1. rm -r /tmp/zookeeper/*

次に、手順2でバックアップしたデータを復元します。

  1. tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2

The -C フラグが伝えます tar ディレクトリに移動します /tmp/zookeeper データを抽出する前に。 指定します --strip 2 作成するフラグ tar アーカイブの内容を抽出します /tmp/zookeeper/ それ自体であり、別のディレクトリ(たとえば /tmp/zookeeper/tmp/zookeeper/)その中。

これで、クラスター状態データが正常に復元されました。 これで、次のセクションのKafkaデータ復元プロセスに進むことができます。

ステップ5—Kafkaデータの復元

このセクションでは、Kafkaデータディレクトリを削除して圧縮アーカイブファイルを復元することにより、バックアップされたKafkaデータを既存のソースインストール(またはオプションの手順7に従った場合は宛先サーバー)に復元します。 これにより、復元が正常に機能することを確認できます。

次のコマンドを使用して、既存のKafkaデータディレクトリを安全に削除できます。

  1. rm -r /tmp/kafka-logs/*

データを削除したので、Kafkaのインストールは、トピックやメッセージが含まれていない新しいインストールに似ています。 バックアップしたデータを復元するには、次のコマンドを実行してファイルを抽出します。

  1. tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2

The -C フラグが伝えます tar ディレクトリに移動します /tmp/kafka-logs データを抽出する前に。 指定します --strip 2 アーカイブのコンテンツがで抽出されることを確認するフラグ /tmp/kafka-logs/ それ自体であり、別のディレクトリ(たとえば /tmp/kafka-logs/kafka-logs/)その中。

データが正常に抽出されたので、次のように入力して、KafkaおよびZooKeeperサービスを再開できます。 exit、root以外のsudoユーザーに切り替えて、次のコマンドを実行します。

  1. sudo systemctl start kafka

次のコマンドでZooKeeperサービスを開始します。

  1. sudo systemctl start zookeeper

kafkaユーザーとして再度ログインします。

  1. sudo -iu kafka

復元しました kafka データについては、次のセクションで復元が成功したことの確認に進むことができます。

ステップ6—復元の確認

Kafkaデータの復元をテストするには、手順1で作成したトピックからのメッセージを使用します。

Kafkaが起動するまで数分待ってから、次のコマンドを実行して、からのメッセージを読み取ります。 BackupTopic:

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

次のような警告が表示された場合は、Kafkaが完全に起動するのを待つ必要があります。

Output
[2018-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

さらに数分後に前のコマンドを再試行するか、実行してください sudo systemctl restart kafka root以外のsudoユーザーとして。 復元に問題がない場合は、次の出力が表示されます。

Output
Test Message 1

このメッセージが表示されない場合は、前のセクションのコマンドを見逃していないかどうかを確認して実行できます。

復元されたKafkaデータを確認したので、これは、単一のKafkaインストールでデータを正常にバックアップおよび復元したことを意味します。 手順7に進んで、クラスターとトピックのデータを別のサーバーのインストールに移行する方法を確認できます。

ステップ7—バックアップを別のKafkaサーバーに移行および復元する(オプション)

このセクションでは、バックアップされたデータをソースのKafkaサーバーから宛先のKafkaサーバーに移行します。 そのためには、最初に scp 圧縮されたものをダウンロードするコマンド tar.gz ローカルシステムへのファイル。 次に、を使用します scp 再度、ファイルを宛先サーバーにプッシュします。 ファイルが移行先サーバーに存在するようになったら、前に使用した手順に従ってバックアップを復元し、移行が成功したことを確認できます。

バックアップファイルをローカルにダウンロードしてから、ソースサーバーから宛先サーバーに直接コピーするのではなく、宛先サーバーにアップロードします。これは、宛先サーバーにソースサーバーのSSHキーが含まれていないためです。 /home/sammy/.ssh/authorized_keys ファイルであり、ソースサーバーとの間で接続できません。 ただし、ローカルマシンは両方のサーバーに接続できるため、送信元サーバーから宛先サーバーへのSSHアクセスを設定する追加の手順を省くことができます。

ダウンロード zookeeper-backup.tar.gzkafka-backup.tar.gz 次のコマンドを実行して、ファイルをローカルマシンに送信します。

  1. scp sammy@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .

次のような出力が表示されます。

Output
zookeeper-backup.tar.gz 100% 68KB 128.0KB/s 00:00

次のコマンドを実行して、 kafka-backup.tar.gz ローカルマシンへのファイル:

  1. scp sammy@source_server_ip:/home/kafka/kafka-backup.tar.gz .

次の出力が表示されます。

Output
kafka-backup.tar.gz 100% 1031KB 488.3KB/s 00:02

走る ls ローカルマシンの現在のディレクトリに、両方のファイルが表示されます。

Output
kafka-backup.tar.gz zookeeper.tar.gz

次のコマンドを実行して、 zookeeper-backup.tar.gz にファイルする /home/kafka/ 宛先サーバーの:

  1. scp zookeeper-backup.tar.gz sammy@destination_server_ip:/home/sammy/zookeeper-backup.tar.gz

次のコマンドを実行して、 kafka-backup.tar.gz にファイルする /home/kafka/ 宛先サーバーの:

  1. scp kafka-backup.tar.gz sammy@destination_server_ip:/home/sammy/kafka-backup.tar.gz

バックアップファイルが宛先サーバーに正常にアップロードされました。 ファイルはにあるので /home/sammy/ ディレクトリがあり、 kafka ユーザーによるアクセスのための適切な権限がない場合は、ファイルをに移動できます。 /home/kafka/ ディレクトリを作成し、権限を変更します。

次のコマンドを実行して、宛先サーバーにSSHで接続します。

  1. ssh sammy@destination_server_ip

今移動します zookeeper-backup.tar.gz/home/kafka/ 実行することによって:

  1. sudo mv zookeeper-backup.tar.gz /home/sammy/zookeeper-backup.tar.gz

同様に、次のコマンドを実行してコピーします kafka-backup.tar.gz/home/kafka/:

  1. sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz

次のコマンドを実行して、バックアップファイルの所有者を変更します。

  1. sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz

以前 mvchown コマンドは出力を表示しません。

バックアップファイルが宛先サーバーの正しいディレクトリにあるので、このチュートリアルのステップ4から6にリストされているコマンドに従って、宛先サーバーのデータを復元および確認します。

結論

このチュートリアルでは、Kafkaのトピックとメッセージを、同じインストールと別々のサーバーへのインストールの両方からバックアップ、インポート、および移行しました。 Kafkaの他の便利な管理タスクについて詳しく知りたい場合は、Kafkaの公式ドキュメントの操作セクションを参照してください。

などのバックアップファイルを保存するには zookeeper-backup.tar.gzkafka-backup.tar.gz リモートで、デジタルオーシャンスペースを探索できます。 Kafkaがサーバーで実行されている唯一のサービスである場合は、フルインスタンスバックアップなどの他のバックアップ方法を調べることもできます。