Debian10にApacheKafkaをインストールする方法
序章
Apache Kafka は、大量のリアルタイムデータを処理するように設計された人気の分散メッセージブローカーです。 Kafkaクラスターは、拡張性とフォールトトレラント性が高く、ActiveMQやRabbitMQなどの他のメッセージブローカーと比較してスループットがはるかに高くなっています。 一般にパブリッシュ/サブスクライブメッセージングシステムとして使用されますが、パブリッシュされたメッセージの永続的なストレージを提供するため、多くの組織でもログ集約に使用されます。
パブリッシュ/サブスクライブメッセージングシステムを使用すると、1つ以上のプロデューサーが、コンシューマーの数やメッセージの処理方法を考慮せずにメッセージをパブリッシュできます。 購読しているクライアントには、更新と新しいメッセージの作成について自動的に通知されます。 このシステムは、クライアントが定期的にポーリングして新しいメッセージが利用可能かどうかを判断するシステムよりも効率的でスケーラブルです。
このチュートリアルでは、Apache Kafka2.1.1をDebian10サーバーに安全にインストールして構成し、セットアップをテストして、 Hello World
メッセージ。 次に、オプションで KafkaT をインストールして、Kafkaを監視し、Kafkaマルチノードクラスターをセットアップします。
前提条件
フォローするには、次のものが必要です。
- 少なくとも4GBのRAMを備えた1台のDebian10サーバーと、sudo権限を持つ非rootユーザー。 root以外のユーザーを設定していない場合は、 Debian10の初期サーバー設定ガイドで指定されている手順に従ってください。
- OpenJDK11がサーバーにインストールされています。 このバージョンをインストールするには、OpenJDKの特定のバージョンをインストールする際の Debian10にAptを使用してJavaをインストールする方法の指示に従ってください。 KafkaはJavaで記述されているため、JVMが必要です。
注: 4GBのRAMを使用せずにインストールすると、Kafkaサービスが失敗し、 Java仮想マシン(JVM)が Out Of Memory
起動時の例外。
ステップ1—Kafkaのユーザーを作成する
Kafkaはネットワーク経由でリクエストを処理できるため、専用のユーザーを作成することをお勧めします。 これにより、Kafkaサーバーが危険にさらされた場合のDebianマシンへの損傷が最小限に抑えられます。 このステップでは、専用ユーザーkafkaを作成します。
root以外のsudoユーザーとしてログインし、というユーザーを作成します kafka
とともに useradd
指図:
- sudo useradd kafka -m
The -m
フラグは、ユーザーのホームディレクトリが作成されることを保証します。 このホームディレクトリ、 /home/kafka
は、後でコマンドを実行するためのワークスペースディレクトリとして機能します。
を使用してパスワードを設定します passwd
:
- sudo passwd kafka
このユーザーに使用するパスワードを入力します。
次に、kafkaユーザーをに追加します sudo
とのグループ adduser
コマンド。Kafkaの依存関係をインストールするために必要な権限があります。
- sudo adduser kafka sudo
これで、kafkaユーザーの準備が整いました。 を使用してこのアカウントにログインします su
:
- su -l kafka
Kafka固有のユーザーを作成したので、Kafkaバイナリのダウンロードと抽出に進むことができます。
ステップ2—Kafkaバイナリのダウンロードと抽出
この手順では、Kafkaバイナリをダウンロードして、kafkaユーザーのホームディレクトリの専用フォルダに抽出します。
まず、にディレクトリを作成します /home/kafka
と呼ばれる Downloads
ダウンロードを保存するには:
- mkdir ~/Downloads
次に、インストールします curl
を使用して apt-get
リモートファイルをダウンロードできるようにするには、次のようにします。
- sudo apt-get update && sudo apt-get install curl
プロンプトが表示されたら、次のように入力します Y
確認するには curl
ダウンロード。
一度 curl
がインストールされている場合は、それを使用してKafkaバイナリをダウンロードします。
- curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
と呼ばれるディレクトリを作成します kafka
このディレクトリに移動します。 これは、Kafkaインストールのベースディレクトリになります。
- mkdir ~/kafka && cd ~/kafka
を使用してダウンロードしたアーカイブを抽出します tar
指図:
- tar -xvzf ~/Downloads/kafka.tgz --strip 1
指定しました --strip 1
アーカイブのコンテンツがで抽出されることを確認するフラグ ~/kafka/
それ自体であり、その中の別のディレクトリにはありません。 ~/kafka/kafka_2.12-2.1.1/
.
バイナリを正常にダウンロードして抽出したので、トピックを削除できるようにKafkaの構成に進むことができます。
ステップ3—Kafkaサーバーの構成
Kafkaのデフォルトの動作では、トピック、メッセージを公開できるカテゴリ、グループ、またはフィード名を削除できません。 これを変更するには、構成ファイルを編集します。
Kafkaの構成オプションはで指定されています server.properties
. このファイルをで開く nano
またはお気に入りの編集者:
- nano ~/kafka/config/server.properties
Kafkaトピックを削除できるようにする設定を追加しましょう。 次の強調表示された行をファイルの最後に追加します。
...
group.initial.rebalance.delay.ms
delete.topic.enable = true
ファイルを保存して終了します nano
. Kafkaを構成したので、次のように作成できます。 systemd
起動時にKafkaを実行および有効化するためのユニットファイル。
ステップ4—Systemdユニットファイルの作成とKafkaサーバーの起動
このセクションでは、Kafkaサービス用のsystemdユニットファイルを作成します。 これは、他のLinuxサービスと一貫した方法で、Kafkaの開始、停止、再起動などの一般的なサービスアクションを実行するのに役立ちます。
ZooKeeperは、Kafkaがクラスターの状態と構成を管理するために使用するサービスです。 これは、分散システムで不可欠なコンポーネントとして一般的に使用されます。 このチュートリアルでは、Zookeeperを使用してKafkaのこれらの側面を管理します。 詳細については、公式のZooKeeperドキュメントをご覧ください。
まず、のユニットファイルを作成します zookeeper
:
- sudo nano /etc/systemd/system/zookeeper.service
次のユニット定義をファイルに入力します。
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit]
セクションでは、ZooKeeperがネットワークを必要とし、ファイルシステムを起動する前に準備ができている必要があることを指定しています。
The [Service]
セクションはそれを指定します systemd
を使用する必要があります zookeeper-server-start.sh
と zookeeper-server-stop.sh
サービスを開始および停止するためのシェルファイル。 また、ZooKeeperが異常終了した場合に自動的に再起動する必要があることも指定しています。
次に、を作成します systemd
のサービスファイル kafka
:
- sudo nano /etc/systemd/system/kafka.service
次のユニット定義をファイルに入力します。
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit]
セクションは、このユニットファイルが依存することを指定します zookeeper.service
. これにより、 zookeeper
が自動的に開始される kafka
サービスを開始します。
The [Service]
セクションはそれを指定します systemd
を使用する必要があります kafka-server-start.sh
と kafka-server-stop.sh
サービスを開始および停止するためのシェルファイル。 また、Kafkaが異常終了した場合に自動的に再起動する必要があることも指定しています。
ユニットが定義されたので、次のコマンドでKafkaを起動します。
- sudo systemctl start kafka
サーバーが正常に起動したことを確認するには、ジャーナルログで kafka
単位:
- sudo journalctl -u kafka
次のような出力が表示されます。
OutputMar 23 13:31:48 kafka systemd[1]: Started kafka.service.
これで、Kafkaサーバーがポートでリッスンします 9092
、これはKafkaのデフォルトポートです。
あなたは始めました kafka
サービスですが、サーバーを再起動した場合、サーバーはまだ自動的に開始されません。 有効にする kafka
サーバーの起動時に、次を実行します。
- sudo systemctl enable kafka
サービスを開始して有効にしたので、次はインストールを確認します。
ステップ5—インストールのテスト
公開して消費しましょう Hello World
Kafkaサーバーが正しく動作していることを確認するためのメッセージ。 Kafkaでメッセージを公開するには、次のものが必要です。
- プロデューサー。トピックへのレコードとデータの公開を可能にします。
- コンシューマー。トピックからメッセージとデータを読み取ります。
まず、という名前のトピックを作成します TutorialTopic
次のように入力します。
- ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
コマンドラインからプロデューサーを作成するには、 kafka-console-producer.sh
脚本。 Kafkaサーバーのホスト名、ポート、およびトピック名を引数として想定しています。
文字列を公開する Hello, World
に TutorialTopic
次のように入力してトピックを入力します。
- echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
The --broker-list
フラグは、この場合、メッセージを送信するメッセージブローカーのリストを決定します localhost:9092
. --topic
トピックをとして指定します TutorialTopic
.
次に、を使用してKafkaコンシューマーを作成できます kafka-console-consumer.sh
脚本。 ZooKeeperサーバーのホスト名とポート、およびトピック名を引数として想定しています。
次のコマンドは、からのメッセージを消費します TutorialTopic
. の使用に注意してください --from-beginning
フラグ。コンシューマーが開始される前に公開されたメッセージの消費を許可します。
- ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning
--bootstrap-server
Kafkaクラスターへの入力のリストを提供します。 この場合、使用しています localhost:9092
.
あなたが見るでしょう Hello, World
ターミナルで:
OutputHello, World
スクリプトは引き続き実行され、トピックにさらにメッセージが公開されるのを待ちます。 新しいターミナルを開いてプロデューサーを開始し、さらにいくつかのメッセージを公開してください。 コンシューマーの出力でそれらすべてを確認できるはずです。 Kafkaの使用方法について詳しく知りたい場合は、公式のKafkaドキュメントを参照してください。
テストが終了したら、を押します CTRL+C
コンシューマスクリプトを停止します。 インストールをテストしたので、Kafkaクラスターをより適切に管理するためにKafkaTのインストールに進むことができます。
ステップ6— KafkaTのインストール(オプション)
KafkaT はAirbnbのツールで、Kafkaクラスターの詳細を表示したり、コマンドラインから特定の管理タスクを実行したりするのが簡単になります。 Ruby gemなので、使用するにはRubyが必要です。 また、 build-essential
それが依存する他の宝石を構築できるようにするためのパッケージ。 を使用してそれらをインストールします apt
:
- sudo apt install ruby ruby-dev build-essential
これで、KafkaTを使用してインストールできます。 gem
指図:
- sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
The CFLAGS=-Wno-error=format-overflow
オプションはフォーマットオーバーフロー警告を無効にし、KafkaTの依存関係であるZooKeepergemに必要です。
KafkaTは .kafkatcfg
Kafkaサーバーのインストールおよびログディレクトリを決定するための構成ファイルとして。 また、KafkaTをZooKeeperインスタンスにポイントするエントリが必要です。
と呼ばれる新しいファイルを作成します .kafkatcfg
:
- nano ~/.kafkatcfg
次の行を追加して、KafkaサーバーとZookeeperインスタンスに関する必要な情報を指定します。
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
これで、KafkaTを使用する準備が整いました。 まず、これを使用してすべてのKafkaパーティションの詳細を表示する方法を次に示します。
- kafkat partitions
次の出力が表示されます。
OutputTopic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
この出力は TutorialTopic
、 としても __consumer_offsets
、クライアント関連情報を保存するためにKafkaによって使用される内部トピック。 で始まる行は無視してかまいません __consumer_offsets
.
KafkaTの詳細については、KafkaTのGitHubリポジトリを参照してください。
KafkaTをインストールしたので、オプションでDebian 10サーバーのクラスターにKafkaをセットアップして、マルチノードクラスターを作成できます。
ステップ7—マルチノードクラスターのセットアップ(オプション)
より多くのDebian10サーバーを使用してマルチブローカークラスターを作成する場合は、新しい各マシンでステップ1、ステップ4、およびステップ5を繰り返します。 さらに、次の変更を加えます。 ~/kafka/config/server.properties
それぞれのファイル:
-
の値を変更します
broker.id
クラスタ全体で一意になるようなプロパティ。 このプロパティは、クラスター内の各サーバーを一意に識別し、その値として任意の文字列を持つことができます。 例えば、"server1"
,"server2"
などは、識別子として役立ちます。 -
の値を変更します
zookeeper.connect
すべてのノードが同じZooKeeperインスタンスを指すようにプロパティ。 このプロパティは、ZooKeeperインスタンスのアドレスを指定し、<HOSTNAME/IP_ADDRESS>:<PORT>
フォーマット。 このチュートリアルでは、your_first_server_IP:2181
、交換your_first_server_IP
すでに設定したDebian10サーバーのIPアドレスを使用します。
クラスタに複数のZooKeeperインスタンスが必要な場合は、 zookeeper.connect
各ノードのプロパティは、すべてのZooKeeperインスタンスのIPアドレスとポート番号をリストする同一のコンマ区切りの文字列である必要があります。
注:ZookeeperがインストールされているDebian10サーバーでファイアウォールがアクティブになっている場合は、必ずポートを開いてください 2181
クラスタ内の他のノードからの着信要求を許可します。
ステップ8—Kafkaユーザーを制限する
すべてのインストールが完了したので、を削除できます kafka
ユーザーの管理者権限。 その前に、root以外のsudoユーザーとしてログアウトして再度ログインしてください。 このチュートリアルを開始したのと同じシェルセッションをまだ実行している場合は、次のように入力します。 exit
.
を削除します kafka
sudoグループのユーザー:
- sudo deluser kafka sudo
Kafkaサーバーのセキュリティをさらに向上させるには、 kafka
を使用したユーザーのパスワード passwd
指図。 これにより、誰もこのアカウントを使用してサーバーに直接ログインできないようになります。
- sudo passwd kafka -l
この時点で、rootまたはsudoユーザーのみが次のようにログインできます。 kafka
次のコマンドを入力します。
- sudo su - kafka
今後、ロックを解除したい場合は、 passwd
とともに -u
オプション:
- sudo passwd kafka -u
これで、 kafka
ユーザーの管理者権限。
結論
これで、Debianサーバー上でApacheKafkaが安全に実行されました。 ほとんどのプログラミング言語で利用可能なKafkaクライアントを使用してKafkaプロデューサーとコンシューマーを作成することにより、プロジェクトでそれを利用できます。 Kafkaの詳細については、ApacheKafkaのドキュメントを参照することもできます。