開発者ドキュメント

Ubuntu20.04にApacheKafkaをインストールする方法

著者は、 Write for DOnations プログラムの一環として、 Free and Open SourceFundを選択して寄付を受け取りました。

序章

Apache Kafka は、大量のリアルタイムデータを処理するように設計された人気のある分散メッセージブローカーです。 Kafkaクラスターは、拡張性が高く、フォールトトレラントです。 また、ActiveMQRabbitMQなどの他のメッセージブローカーと比較してスループットがはるかに高くなっています。 一般的にパブリッシュ/サブスクライブメッセージングシステムとして使用されますが、パブリッシュされたメッセージの永続ストレージを提供するため、多くの組織でもログ集約に使用されます。

パブリッシュ/サブスクライブメッセージングシステムを使用すると、1つ以上のプロデューサーが、コンシューマーの数やメッセージの処理方法を考慮せずにメッセージをパブリッシュできます。 購読しているクライアントには、更新と新しいメッセージの作成について自動的に通知されます。 このシステムは、クライアントが定期的にポーリングして新しいメッセージが利用可能かどうかを判断するシステムよりも効率的でスケーラブルです。

このチュートリアルでは、Ubuntu20.04にApacheKafka2.6.3をインストールして使用します。

前提条件

フォローするには、次のものが必要です。

ステップ1—Kafkaのユーザーを作成する

Kafkaはネットワーク経由でリクエストを処理できるため、最初のステップはサービス専用のユーザーを作成することです。 これにより、誰かがKafkaサーバーを侵害した場合のUbuntuマシンへのダメージを最小限に抑えることができます。 このステップでは、専用のkafkaユーザーを作成します。

root以外のsudoユーザーとしてログインし、kafkaというユーザーを作成します。

  1. sudo adduser kafka

プロンプトに従ってパスワードを設定し、kafkaユーザーを作成します。

次に、adduserコマンドを使用して、kafkaユーザーをsudoグループに追加します。 Kafkaの依存関係をインストールするには、次の権限が必要です。

  1. sudo adduser kafka sudo

これで、kafkaユーザーの準備が整いました。 suを使用してアカウントにログインします。

  1. su -l kafka

Kafka固有のユーザーを作成したので、Kafkaバイナリをダウンロードして抽出する準備が整いました。

ステップ2—Kafkaバイナリのダウンロードと抽出

Kafkaバイナリをダウンロードして、kafkaユーザーのホームディレクトリの専用フォルダに抽出してみましょう。

まず、/home/kafkaDownloadsというディレクトリを作成してダウンロードを保存します。

  1. mkdir ~/Downloads

curlを使用して、Kafkaバイナリをダウンロードします。

  1. curl "https://downloads.apache.org/kafka/2.6.3/kafka_2.13-2.6.3.tgz" -o ~/Downloads/kafka.tgz

kafkaというディレクトリを作成し、このディレクトリに移動します。 これは、Kafkaインストールのベースディレクトリになります。

  1. mkdir ~/kafka && cd ~/kafka

tarコマンドを使用してダウンロードしたアーカイブを抽出します。

  1. tar -xvzf ~/Downloads/kafka.tgz --strip 1

--strip 1フラグを指定して、アーカイブの内容が~/kafka/自体に抽出され、アーカイブ内の別のディレクトリ(~/kafka/kafka_2.13-2.6.3/など)には抽出されないようにします。

バイナリのダウンロードと抽出が正常に完了したので、Kafkaサーバーの構成を開始できます。

ステップ3—Kafkaサーバーの構成

Kafkaのデフォルトの動作では、トピックを削除できません。 Kafkaトピックは、メッセージを公開できるカテゴリ、グループ、またはフィード名です。 これを変更するには、構成ファイルを編集する必要があります。

Kafkaの構成オプションは、server.propertiesで指定されています。 nanoまたはお気に入りのエディターでこのファイルを開きます。

  1. nano ~/kafka/config/server.properties

まず、Kafkaトピックを削除できるようにする設定を追加します。 ファイルの最後に以下を追加します。

〜/ kafka / config / server.properties
delete.topic.enable = true

次に、logs.dirプロパティを変更して、Kafkaログが保存されているディレクトリを変更します。

〜/ kafka / config / server.properties
log.dirs=/home/kafka/logs

ファイルを保存して閉じます。 Kafkaを構成したので、次のステップは、起動時にKafkaサーバーを実行して有効にするためのsystemdユニットファイルを作成することです。

ステップ4—Systemdユニットファイルの作成とKafkaサーバーの起動

このセクションでは、Kafkaサービス用のsystemdユニットファイルを作成します。 これは、他のLinuxサービスと一貫した方法で、Kafkaの開始、停止、再起動などの一般的なサービスアクションを実行するのに役立ちます。

Zookeeperは、Kafkaがクラスターの状態と構成を管理するために使用するサービスです。 多くの分散システムで使用されています。 詳細については、公式のZookeeperドキュメントにアクセスしてください。

zookeeperのユニットファイルを作成します。

  1. sudo nano /etc/systemd/system/zookeeper.service

次の単位定義をファイルに入力します。

/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]セクションでは、Zookeeperを起動する前に、ネットワークとファイルシステムの準備ができている必要があることを指定しています。

[Service]セクションでは、systemdがサービスの開始と停止にzookeeper-server-start.shおよびzookeeper-server-stop.shシェルファイルを使用する必要があることを指定しています。 また、Zookeeperが異常終了した場合に再起動する必要があることも指定しています。

このコンテンツを追加したら、ファイルを保存して閉じます。

次に、kafkaのsystemdサービスファイルを作成します。

  1. sudo nano /etc/systemd/system/kafka.service

次の単位定義をファイルに入力します。

/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit]セクションは、このユニットファイルがzookeeper.serviceに依存することを指定します。 これにより、kafkaサービスの開始時にzookeeperが自動的に開始されます。

[Service]セクションでは、systemdがサービスの開始と停止にkafka-server-start.shおよびkafka-server-stop.shシェルファイルを使用する必要があることを指定しています。 また、Kafkaが異常終了した場合に再起動する必要があることも指定しています。

ユニットを定義したので、次のコマンドでKafkaを起動します。

  1. sudo systemctl start kafka

サーバーが正常に起動したことを確認するには、kafkaユニットのジャーナルログを確認します。

  1. sudo systemctl status kafka

次のような出力が表示されます。

Output
● kafka.service Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled) Active: active (running) since Wed 2021-02-10 00:09:38 UTC; 1min 58s ago Main PID: 55828 (sh) Tasks: 67 (limit: 4683) Memory: 315.8M CGroup: /system.slice/kafka.service ├─55828 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1 └─55829 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=> Feb 10 00:09:38 cart-67461-1 systemd[1]: Started kafka.service.

これで、ポート9092でリッスンしているKafkaサーバーができました。

kafkaサービスを開始しました。 ただし、サーバーを再起動した場合、Kafkaは自動的に再起動しません。 サーバーの起動時にkafkaサービスを有効にするには、次のコマンドを実行します。

  1. sudo systemctl enable zookeeper
  2. sudo systemctl enable kafka

このステップでは、kafkaおよびzookeeperサービスを開始して有効にしました。 次のステップでは、Kafkaのインストールを確認します。

ステップ5—Kafkaインストールのテスト

このステップでは、Kafkaのインストールをテストします。 具体的には、「HelloWorld」メッセージを公開して消費し、Kafkaサーバーが正しく動作していることを確認します。

Kafkaでメッセージを公開するには、次のものが必要です。

まず、TutorialTopicという名前のトピックを作成します。

  1. ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

kafka-console-producer.shスクリプトを使用して、コマンドラインからプロデューサーを作成できます。 Kafkaサーバーのホスト名、ポート、およびトピックを引数として想定しています。

次に、文字列"Hello, World"TutorialTopicトピックに公開します。

  1. echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

次に、kafka-console-consumer.shスクリプトを使用してKafkaコンシューマーを作成します。 ZooKeeperサーバーのホスト名とポート、および引数としてのトピック名が必要です。

次のコマンドは、TutorialTopicからのメッセージを消費します。 --from-beginningフラグの使用に注意してください。これにより、コンシューマーが開始される前に公開されたメッセージを使用できます。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

構成に問題がない場合は、端末にHello, Worldが表示されます。

Output
Hello, World

スクリプトは引き続き実行され、さらにメッセージが公開されるのを待ちます。 これをテストするには、新しいターミナルウィンドウを開き、サーバーにログインします。

この新しいターミナルで、プロデューサーを起動して別のメッセージを公開します。

  1. echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

コンシューマーの出力に次のメッセージが表示されます。

Output
Hello, World Hello World from Sammy at DigitalOcean!

テストが終了したら、CTRL+Cを押してコンシューマスクリプトを停止します。

これで、Ubuntu20.04にKafkaサーバーをインストールして構成しました。 次のステップでは、Kafkaサーバーのセキュリティを強化するためのいくつかの簡単なタスクを実行します。

ステップ6—Kafkaサーバーを強化する

インストールが完了したら、kafkaユーザーの管理者権限を削除できます。 その前に、root以外のsudoユーザーとしてログアウトして再度ログインしてください。 このチュートリアルを開始したときと同じシェルセッションをまだ実行している場合は、exitと入力します。

kafkaユーザーをsudoグループから削除します。

  1. sudo deluser kafka sudo

Kafkaサーバーのセキュリティをさらに向上させるには、passwdコマンドを使用してkafkaユーザーのパスワードをロックします。 これにより、誰もこのアカウントを使用してサーバーに直接ログインできないようになります。

  1. sudo passwd kafka -l

この時点で、rootまたはsudoユーザーのみが次のコマンドを入力してkafkaとしてログインできます。

  1. sudo su - kafka

将来、ロックを解除する場合は、passwd-uオプションとともに使用します。

  1. sudo passwd kafka -u

これで、kafkaユーザーの管理者権限が正常に制限されました。 Kafkaの使用を開始する準備ができました。または、次のオプションの手順に従って、KafkaTをシステムに追加します。

ステップ7— KafkaTのインストール(オプション)

KafkaTはAirbnbが開発したツールです。 Kafkaクラスターの詳細を表示し、コマンドラインから特定の管理タスクを実行するのが簡単になります。 ただし、Ruby gemであるため、使用するにはRubyが必要です。 KafkaTが依存する他のgemをビルドするには、build-essentialパッケージも必要になります。 aptを使用してそれらをインストールします。

  1. sudo apt install ruby ruby-dev build-essential

これで、gemコマンドを使用してKafkaTをインストールできます。

  1. sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

kafkatのインストールプロセス中のZookeeperの警告とエラーを抑制するには、「Wno-error=format-overflow」コンパイルフラグが必要です。

KafkaTは、構成ファイルとして.kafkatcfgを使用して、Kafkaサーバーのインストールディレクトリとログディレクトリを決定します。 また、KafkaTをZooKeeperインスタンスにポイントするエントリが必要です。

.kafkatcfgという名前の新しいファイルを作成します。

  1. nano ~/.kafkatcfg

次の行を追加して、KafkaサーバーとZookeeperインスタンスに関する必要な情報を指定します。

〜/ .kafkatcfg
{
  "kafka_path": "~/kafka",
  "log_path": "/home/kafka/logs",
  "zk_path": "localhost:2181"
}

これで、KafkaTを使用する準備が整いました。 まず、これを使用してすべてのKafkaパーティションの詳細を表示する方法を次に示します。

  1. kafkat partitions

次の出力が表示されます。

Output
[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible. /var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated ... Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...

TutorialTopicと、Kafkaがクライアント関連情報を保存するために使用する内部トピックである__consumer_offsetsが表示されます。 __consumer_offsetsで始まる行は無視してかまいません。

KafkaTの詳細については、KafkaTのGitHubリポジトリを参照してください。

結論

これで、UbuntuサーバーでApacheKafkaが安全に実行されます。 Kafkaクライアントを使用して、Kafkaをお気に入りのプログラミング言語に統合できるようになりました。

Kafkaの詳細については、Kafkaのドキュメントを参照することもできます。

モバイルバージョンを終了