序章

Apache Kafka は、大量のリアルタイムデータを効率的に処理するように設計された人気の分散メッセージブローカーです。 Kafkaクラスターは、拡張性とフォールトトレラント性が高いだけでなく、ActiveMQRabbitMQなどの他のメッセージブローカーと比較してスループットがはるかに高くなっています。 一般にパブリッシュ/サブスクライブメッセージングシステムとして使用されますが、パブリッシュされたメッセージの永続的なストレージを提供するため、多くの組織でもログ集約に使用されます。

パブリッシュ/サブスクライブメッセージングシステムを使用すると、1つ以上のプロデューサーが、コンシューマーの数やメッセージの処理方法を考慮せずにメッセージをパブリッシュできます。 購読しているクライアントには、更新と新しいメッセージの作成について自動的に通知されます。 このシステムは、クライアントが定期的にポーリングして新しいメッセージが利用可能かどうかを判断するシステムよりも効率的でスケーラブルです。

このチュートリアルでは、Debian9にApacheKafka2.1.1をインストールして使用します。

前提条件

フォローするには、次のものが必要です。

  • 1台のDebian9サーバーとsudo権限を持つroot以外のユーザー。 root以外のユーザーを設定していない場合は、このガイドで指定されている手順に従ってください。
  • サーバー上に少なくとも4GBのRAM。 この量のRAMを使用せずにインストールすると、Kafkaサービスが失敗し、 Java仮想マシン(JVM)が起動時に「メモリ不足」例外をスローする可能性があります。
  • OpenJDK8がサーバーにインストールされています。 このバージョンをインストールするには、OpenJDKの特定のバージョンをインストールする際のこれらの手順に従ってください。 KafkaはJavaで記述されているため、JVMが必要です。 ただし、その起動シェルスクリプトにはバージョン検出のバグがあり、8を超えるJVMバージョンでは起動に失敗します。

ステップ1—Kafkaのユーザーを作成する

Kafkaはネットワーク経由でリクエストを処理できるため、専用のユーザーを作成する必要があります。 これにより、Kafkaサーバーが危険にさらされた場合のDebianマシンへの損傷が最小限に抑えられます。 このステップでは専用のkafkaユーザーを作成しますが、Kafkaのセットアップが完了したら、このサーバーで他のタスクを実行するために別の非rootユーザーを作成する必要があります。

root以外のsudoユーザーとしてログインし、kafkaというユーザーを作成します。 useradd 指図:

  1. sudo useradd kafka -m

The -m フラグは、ユーザーのホームディレクトリが作成されることを保証します。 このホームディレクトリ、 /home/kafkaは、以下のセクションのコマンドを実行するためのワークスペースディレクトリとして機能します。

を使用してパスワードを設定します passwd:

  1. sudo passwd kafka

kafkaユーザーをに追加します sudo とのグループ adduser コマンド。Kafkaの依存関係をインストールするために必要な権限があります。

  1. sudo adduser kafka sudo

これで、kafkaユーザーの準備が整いました。 を使用してこのアカウントにログインします su:

  1. su -l kafka

Kafka固有のユーザーを作成したので、Kafkaバイナリのダウンロードと抽出に進むことができます。

ステップ2—Kafkaバイナリのダウンロードと抽出

Kafkaバイナリをダウンロードして、kafkaユーザーのホームディレクトリの専用フォルダに抽出してみましょう。

まず、にディレクトリを作成します /home/kafka と呼ばれる Downloads ダウンロードを保存するには:

  1. mkdir ~/Downloads

インストール curl を使用して apt-get リモートファイルをダウンロードできるようにするには、次のようにします。

  1. sudo apt-get update && sudo apt-get install -y curl

一度 curl がインストールされている場合は、それを使用してKafkaバイナリをダウンロードします。

  1. curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz

と呼ばれるディレクトリを作成します kafka このディレクトリに移動します。 これは、Kafkaインストールのベースディレクトリになります。

  1. mkdir ~/kafka && cd ~/kafka

を使用してダウンロードしたアーカイブを抽出します tar 指図:

  1. tar -xvzf ~/Downloads/kafka.tgz --strip 1

指定します --strip 1 アーカイブのコンテンツがで抽出されることを確認するフラグ ~/kafka/ それ自体であり、別のディレクトリ(たとえば ~/kafka/kafka_2.11-2.1.1/)その中。

バイナリを正常にダウンロードして抽出したので、トピックを削除できるようにKafkaの構成に進むことができます。

ステップ3—Kafkaサーバーの構成

Kafkaのデフォルトの動作では、トピック、メッセージを公開できるカテゴリ、グループ、またはフィード名を削除できません。 これを変更するには、構成ファイルを編集しましょう。

Kafkaの構成オプションはで指定されています server.properties. このファイルをで開く nano またはお気に入りの編集者:

  1. nano ~/kafka/config/server.properties

Kafkaトピックを削除できるようにする設定を追加しましょう。 ファイルの最後に以下を追加します。

〜/ kafka / config / server.properties
delete.topic.enable = true

ファイルを保存して終了します nano. Kafkaを構成したので、実行用のsystemdユニットファイルの作成と起動時の有効化に進むことができます。

ステップ4—Systemdユニットファイルの作成とKafkaサーバーの起動

このセクションでは、Kafkaサービス用のsystemdユニットファイルを作成します。 これは、他のLinuxサービスと一貫した方法で、Kafkaの開始、停止、再起動などの一般的なサービスアクションを実行するのに役立ちます。

ZooKeeperは、Kafkaがクラスターの状態と構成を管理するために使用するサービスです。 これは、多くの分散システムで不可欠なコンポーネントとして一般的に使用されています。 詳細については、公式のZooKeeperドキュメントをご覧ください。

のユニットファイルを作成します zookeeper:

  1. sudo nano /etc/systemd/system/zookeeper.service

次のユニット定義をファイルに入力します。

/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

The [Unit] セクションでは、ZooKeeperを起動する前に、ネットワークとファイルシステムの準備ができている必要があることを指定しています。

The [Service] セクションは、systemdが使用する必要があることを指定します zookeeper-server-start.shzookeeper-server-stop.sh サービスを開始および停止するためのシェルファイル。 また、ZooKeeperが異常終了した場合に自動的に再起動する必要があることも指定しています。

次に、のsystemdサービスファイルを作成します kafka:

  1. sudo nano /etc/systemd/system/kafka.service

次のユニット定義をファイルに入力します。

/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

The [Unit] セクションは、このユニットファイルが依存することを指定します zookeeper.service. これにより、 zookeeper が自動的に開始される kafka サービスを開始します。

The [Service] セクションは、systemdが使用する必要があることを指定します kafka-server-start.shkafka-server-stop.sh サービスを開始および停止するためのシェルファイル。 また、Kafkaが異常終了した場合に自動的に再起動する必要があることも指定しています。

ユニットが定義されたので、次のコマンドでKafkaを起動します。

  1. sudo systemctl start kafka

サーバーが正常に起動したことを確認するには、ジャーナルログで kafka 単位:

  1. sudo journalctl -u kafka

次のような出力が表示されます。

Output
Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.

これで、Kafkaサーバーがポートでリッスンします 9092.

私たちが始めている間 kafka サービス、サーバーを再起動した場合、サーバーは自動的に開始されません。 有効にする kafka サーバーの起動時に、次を実行します。

  1. sudo systemctl enable kafka

サービスを開始して有効にしたので、インストールを確認しましょう。

ステップ5—インストールのテスト

「HelloWorld」メッセージを公開して使用し、Kafkaサーバーが正しく動作していることを確認しましょう。 Kafkaでメッセージを公開するには、次のものが必要です。

  • プロデューサー。トピックへのレコードとデータの公開を可能にします。
  • コンシューマー。トピックからメッセージとデータを読み取ります。

まず、という名前のトピックを作成します TutorialTopic 次のように入力します。

  1. ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

コマンドラインからプロデューサーを作成するには、 kafka-console-producer.sh 脚本。 Kafkaサーバーのホスト名、ポート、およびトピック名を引数として想定しています。

文字列を公開する "Hello, World"TutorialTopic 次のように入力してトピックを入力します。

  1. echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

次に、を使用してKafkaコンシューマーを作成できます kafka-console-consumer.sh 脚本。 ZooKeeperサーバーのホスト名とポート、および引数としてのトピック名が必要です。

次のコマンドは、からのメッセージを消費します TutorialTopic. の使用に注意してください --from-beginning フラグ。コンシューマーが開始される前に公開されたメッセージの消費を許可します。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

構成の問題がない場合は、次のように表示されます。 Hello, World ターミナルで:

Output
Hello, World

スクリプトは引き続き実行され、トピックにさらにメッセージが公開されるのを待ちます。 新しいターミナルを開いてプロデューサーを開始し、さらにいくつかのメッセージを公開してください。 コンシューマーの出力でそれらすべてを確認できるはずです。

テストが終了したら、を押します CTRL+C コンシューマスクリプトを停止します。 インストールをテストしたので、KafkaTのインストールに移りましょう。

ステップ6— KafkaTをインストールします(オプション)

KafkaT はAirbnbのツールで、Kafkaクラスターの詳細を表示したり、コマンドラインから特定の管理タスクを実行したりするのが簡単になります。 これはRubygemであるため、使用するにはRubyが必要になります。 また、 build-essential それが依存する他の宝石を構築できるようにするためのパッケージ。 を使用してそれらをインストールします apt:

  1. sudo apt install ruby ruby-dev build-essential

これで、gemコマンドを使用してKafkaTをインストールできます。

  1. sudo gem install kafkat

KafkaTは .kafkatcfg Kafkaサーバーのインストールおよびログディレクトリを決定するための構成ファイルとして。 また、KafkaTをZooKeeperインスタンスにポイントするエントリが必要です。

と呼ばれる新しいファイルを作成します .kafkatcfg:

  1. nano ~/.kafkatcfg

次の行を追加して、KafkaサーバーとZookeeperインスタンスに関する必要な情報を指定します。

〜/ .kafkatcfg
{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

これで、KafkaTを使用する準備が整いました。 まず、これを使用してすべてのKafkaパーティションの詳細を表示する方法を次に示します。

  1. kafkat partitions

次の出力が表示されます。

Output
Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...

あなたが見るでしょう TutorialTopic、 としても __consumer_offsets、クライアント関連情報を保存するためにKafkaによって使用される内部トピック。 で始まる行は無視してかまいません __consumer_offsets.

KafkaTの詳細については、KafkaTのGitHubリポジトリを参照してください。

ステップ7—マルチノードクラスターのセットアップ(オプション)

より多くのDebian9マシンを使用してマルチブローカークラスタを作成する場合は、新しいマシンごとにステップ1、ステップ4、およびステップ5を繰り返す必要があります。 さらに、次の変更を行う必要があります server.properties それぞれのファイル:

  • の値 broker.id プロパティは、クラスター全体で一意になるように変更する必要があります。 このプロパティは、クラスター内の各サーバーを一意に識別し、その値として任意の文字列を持つことができます。 例えば、 "server1", "server2"、など。

  • の値 zookeeper.connect すべてのノードが同じZooKeeperインスタンスを指すように、プロパティを変更する必要があります。 このプロパティは、ZooKeeperインスタンスのアドレスを指定し、 <HOSTNAME/IP_ADDRESS>:<PORT> フォーマット。 例えば、 "203.0.113.0:2181", "203.0.113.1:2181"

クラスタに複数のZooKeeperインスタンスが必要な場合は、 zookeeper.connect 各ノードのプロパティは、すべてのZooKeeperインスタンスのIPアドレスとポート番号をリストする同一のコンマ区切りの文字列である必要があります。

ステップ8—Kafkaユーザーを制限する

すべてのインストールが完了したので、kafkaユーザーの管理者権限を削除できます。 その前に、root以外のsudoユーザーとしてログアウトして再度ログインしてください。 このチュートリアルを開始したのと同じシェルセッションをまだ実行している場合は、次のように入力します。 exit.

kafkaユーザーをsudoグループから削除します。

  1. sudo deluser kafka sudo

Kafkaサーバーのセキュリティをさらに向上させるには、kafkaユーザーのパスワードを passwd 指図。 これにより、誰もこのアカウントを使用してサーバーに直接ログインできないようになります。

  1. sudo passwd kafka -l

この時点で、rootまたはsudoユーザーのみが次のようにログインできます。 kafka 次のコマンドを入力します。

  1. sudo su - kafka

今後、ロックを解除したい場合は、 passwd とともに -u オプション:

  1. sudo passwd kafka -u

これで、kafkaユーザーの管理者権限が正常に制限されました。

結論

これで、Debianサーバー上でApacheKafkaが安全に実行されました。 ほとんどのプログラミング言語で利用可能なKafkaクライアントを使用してKafkaプロデューサーとコンシューマーを作成することにより、プロジェクトでそれを利用できます。 Kafkaの詳細については、Kafkaのドキュメントを参照することもできます。