1. 概要

ApacheKafka®は分散ストリーミングプラットフォームです。 前のチュートリアルで、Springを使用してKafkaのコンシューマーとプロデューサーを実装する方法について説明しました。

このチュートリアルでは、Kafkaコネクタの使用方法を学習します。

以下を見ていきます。

  • さまざまなタイプのKafkaコネクタ
  • KafkaConnectの機能とモード
  • プロパティファイルとRESTAPIを使用したコネクタ構成

2. KafkaConnectおよびKafkaコネクタの基本

Kafka Connectは、いわゆる Connector を使用して、Kafkaをデータベース、Key-Valueストア、検索インデックス、ファイルシステムなどの外部システムに接続するためのフレームワークです。

Kafkaコネクタはすぐに使用できるコンポーネントであり、 外部システムからKafkaトピックにデータをインポートし、Kafkaトピックから外部システムにデータをエクスポートします。 一般的なデータソースとシンクに既存のコネクタ実装を使用することも、独自のコネクタを実装することもできます。

ソースコネクタは、システムからデータを収集します。 ソースシステムは、データベース全体、ストリームテーブル、またはメッセージブローカーにすることができます。 ソースコネクタは、アプリケーションサーバーからKafkaトピックにメトリックを収集して、データを低レイテンシでストリーム処理に利用できるようにすることもできます。

シンクコネクタは、Kafkaトピックから他のシステム(Elasticsearchなどのインデックス、Hadoopなどのバッチシステム、または任意の種類のデータベース)にデータを配信します。

一部のコネクタはコミュニティによって維持されていますが、他のコネクタはConfluentまたはそのパートナーによってサポートされています。 実際、ほんの数例を挙げると、S3、JDBC、Cassandraなどの最も一般的なシステム用のコネクタを見つけることができます。

3. 特徴

KafkaConnectの機能は次のとおりです。

  • 外部システムをKafkaに接続するためのフレームワーク– コネクタの開発、展開、および管理を簡素化します
  • 分散モードとスタンドアロンモード– Kafkaの分散性を活用することで大規模なクラスターを展開するのに役立ちます。また、開発、テスト、小規模な実稼働展開のセットアップも可能です
  • RESTインターフェース–RESTAPIを使用してコネクタを管理できます
  • 自動オフセット管理– Kafka Connectは、の処理に役立ちますオフセットコミットプロセス、これにより、コネクタ開発のこのエラーが発生しやすい部分を手動で実装する手間が省けます
  • デフォルトで分散およびスケーラブル– Kafka Connectは、既存のグループ管理プロトコルを使用します。 KafkaConnectクラスターをスケールアップするためにワーカーを追加できます
  • ストリーミングとバッチ統合– Kafka Connectは、Kafkaの既存の機能に関連してストリーミングとバッチデータシステムをブリッジするための理想的なソリューションです
  • 変換–これらにより、個々のメッセージに簡単で軽量な変更を加えることができます

4. 設定

プレーンなKafkaディストリビューションを使用する代わりに、Kafkaの背後にある会社であるConfluent、Inc.が提供するKafkaディストリビューションであるConfluentPlatformをダウンロードします。 Confluent Platformには、プレーンなKafkaと比較して、いくつかの追加のツールとクライアント、およびいくつかの追加のビルド済みコネクタが付属しています。

私たちの場合、Confluentのサイトにあるオープンソース版で十分です。

5. クイックスタートKafkaConnect

手始めに、Kafka Connectの原理、の最も基本的なコネクタであるファイルソースコネクタとファイルシンクコネクタを使用して説明します。

便利なことに、Confluent Platformには、これらのコネクタとリファレンス構成の両方が付属しています。

5.1. ソースコネクタの構成

ソースコネクタの場合、リファレンス設定は $ CONFLUENT_HOME / etc / kafka /connect-file-source.propertiesで入手できます。

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

この構成には、すべてのソースコネクタに共通するいくつかのプロパティがあります。

  • name は、コネクタインスタンスのユーザー指定の名前です
  • connectedor.class は、実装クラス、基本的にはコネクタの種類を指定します
  • tasks.max は、ソースコネクタのインスタンスをいくつ並行して実行するかを指定します。
  • topic は、コネクタが出力を送信するトピックを定義します

この場合、コネクタ固有の属性もあります。

  • file は、コネクタが入力を読み取るファイルを定義します

これを機能させるために、いくつかのコンテンツを含む基本的なファイルを作成しましょう。

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

作業ディレクトリは$CONFLUENT_HOMEであることに注意してください。

5.2. シンクコネクタの構成

シンクコネクタには、 $ CONFLUENT_HOME / etc / kafka/connect-file-sink.propertiesの参照構成を使用します。

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

論理的には、まったく同じパラメーターが含まれていますが、今回はconnector.classがシンクコネクターの実装を指定し、fileはコネクターがコンテンツを書き込む場所です。

5.3. ワーカー構成

最後に、Connectワーカーを構成する必要があります。これにより、2つのコネクタが統合され、ソースコネクタからの読み取りとシンクコネクタへの書き込みが行われます。

そのために、 $ CONFLUENT_HOME / etc / kafka /connect-standalone.propertiesを使用できます。

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java

plugin.path は、コネクタの実装が利用可能なパスのリストを保持できることに注意してください

Kafkaにバンドルされているコネクタを使用するため、plugin.path$CONFLUENT_HOME / share /javaに設定できます。 Windowsを使用する場合、ここに絶対パスを指定する必要がある場合があります。

その他のパラメータについては、デフォルト値のままにしておくことができます。

  • bootstrap.servers には、Kafkaブローカーのアドレスが含まれています
  • key.converterおよびvalue.converterは、データがソースからKafkaに流れ、次にKafkaからシンクに流れるときにデータをシリアル化および逆シリアル化するコンバータークラスを定義します。
  • key.converter.schemas.enableおよびvalue.converter.schemas.enableは、コンバーター固有の設定です
  • offset.storage.file.filename は、Connectをスタンドアロンモードで実行する場合の最も重要な設定です。Connectがオフセットデータを保存する場所を定義します。
  • offset.flush.interval.ms は、ワーカーがタスクのオフセットをコミットしようとする間隔を定義します

また、パラメータのリストはかなり成熟しているため、完全なリストについては、公式ドキュメントを確認してください。

5.4. スタンドアロンモードでのKafkaConnect

これで、最初のコネクタ設定を開始できます。

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

まず、コマンドラインを使用してトピックのコンテンツを検査できます。

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

ご覧のとおり、ソースコネクタは test.txt ファイルからデータを取得し、それをJSONに変換して、Kafkaに送信しました。

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

また、フォルダ $ CONFLUENT_HOME を見ると、ファイルtest.sink.txtがここに作成されていることがわかります。

cat $CONFLUENT_HOME/test.sink.txt
foo
bar

シンクコネクタがpayload属性から値を抽出して宛先ファイルに書き込むため、test.sink.txtのデータには元のテストの内容が含まれます。 txtファイル。

次に、test.txt。に行を追加しましょう。

そうすると、ソースコネクタがこれらの変更を自動的に検出することがわかります。

最後に改行を挿入する必要があるだけです。そうしないと、ソースコネクタは最後の行を考慮しません。

この時点で、接続プロセスを停止しましょう。数行で分散モードで接続を開始します。

6. ConnectのRESTAPI

これまでは、コマンドラインからプロパティファイルを渡すことですべての構成を行いました。 ただし、Connectはサービスとして実行するように設計されているため、RESTAPIも利用できます。

デフォルトでは、 http:// localhost:8083で利用できます。 いくつかのエンドポイントは次のとおりです。

  • GET / connectedors –使用中のすべてのコネクタのリストを返します
  • GET / connectedors / {name} –特定のコネクタに関する詳細を返します
  • POST / attachments –新しいコネクタを作成します。 リクエストの本文は、文字列名フィールドとコネクタ構成パラメーターを含むオブジェクト構成フィールドを含むJSONオブジェクトである必要があります
  • GET / connectedors / {name} / status –コネクタの現在のステータス(実行中、失敗、一時停止のいずれか)、割り当てられているワーカー、失敗した場合のエラー情報、およびすべてのタスクの状態
  • DELETE / connectedors / {name} –コネクタを削除し、すべてのタスクを正常に停止して、その構成を削除します
  • GET / connectedor-plugins – は、KafkaConnectクラスターにインストールされているコネクタープラグインのリストを返します

公式ドキュメントは、すべてのエンドポイントのリストを提供します。

次のセクションでは、RESTAPIを使用して新しいコネクタを作成します。

7. 分散モードでのKafkaConnect

スタンドアロンモードは、開発とテスト、および小規模なセットアップに最適です。 ただし、Kafkaの分散性を最大限に活用するには、Connectを分散モードで起動する必要があります。

そうすることで、コネクタ設定とメタデータがファイルシステムではなくKafkaトピックに保存されます。 その結果、ワーカーノードは実際にはステートレスになります。

7.1. Connectの開始 

分散モードのリファレンス構成は、$ CONFLUENT_HOME/etc/kafka/connect-distributed.properties。にあります。

パラメータは、スタンドアロンモードの場合とほとんど同じです。 いくつかの違いがあります:

  • group.id は、Connectクラスターグループの名前を定義します。 値は、コンシューマーグループIDとは異なる必要があります
  • offset.storage.topic config.storage.topic 、および status.storage.topic は、これらの設定のトピックを定義します。 トピックごとに、レプリケーション係数を定義することもできます

繰り返しになりますが、公式ドキュメントには、すべてのパラメーターのリストが記載されています。

次のように、分散モードでConnectを開始できます。

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

7.2. RESTAPIを使用したコネクタの追加

現在、スタンドアロンのスタートアップコマンドと比較して、引数としてコネクタ構成を渡していません。 代わりに、RESTAPIを使用してコネクタを作成する必要があります。

前の例を設定するには、次のJSON構造体を含む2つのPOSTリクエストを http:// localhost:8083 /connectedorsに送信する必要があります。

まず、ソースコネクタPOSTの本文をJSONファイルとして作成する必要があります。 ここでは、これをconnect-file-source.jsonと呼びます。

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-distributed.txt",
        "topic": "connect-distributed"
    }
}

これが、最初に使用した参照構成ファイルと非常によく似ていることに注意してください。

そしてそれを投稿します:

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

次に、シンクコネクタに対して同じことを行い、ファイルconnect-file-sink.jsonを呼び出します。

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-distributed.sink.txt",
        "topics": "connect-distributed"
    }
}

そして、以前のようにPOSTを実行します。

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

必要に応じて、この設定が正しく機能していることを確認できます。

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

また、フォルダ $ CONFLUENT_HOME を見ると、ファイルtest-distributed.sink.txtがここに作成されていることがわかります。

cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar

分散セットアップをテストした後、2つのコネクタを削除してクリーンアップしましょう。

curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

8. データの変換

8.1. サポートされている変換

変換により、個々のメッセージに簡単で軽量な変更を加えることができます。

Kafka Connectは、次の組み込み変換をサポートしています。

  • InsertField –静的データまたはレコードメタデータのいずれかを使用してフィールドを追加します
  • ReplaceField –フィールドのフィルタリングまたは名前変更
  • MaskField –フィールドをタイプの有効なnull値(ゼロや空の文字列など)に置き換えます
  • HoistField –構造体またはマップ内の単一のフィールドとしてイベント全体をラップします
  • ExtractField –構造体とマップから特定のフィールドを抽出し、このフィールドのみを結果に含めます
  • SetSchemaMetadata –スキーマ名またはバージョンを変更します
  • TimestampRouter –元のトピックとタイムスタンプに基づいてレコードのトピックを変更します
  • RegexRouter –元のトピック、置換文字列、および正規表現に基づいてレコードのトピックを変更します

変換は、次のパラメーターを使用して構成されます。

  • transforms –変換用のエイリアスのコンマ区切りリスト
  • transforms。$alias.type –変換のクラス名
  • transforms。$alias。$transformationSpecificConfig –それぞれの変換の構成

8.2. 変圧器の適用

いくつかの変換機能をテストするために、次の2つの変換を設定しましょう。

  • まず、メッセージ全体をJSON構造体としてラップしましょう
  • その後、その構造体にフィールドを追加しましょう

変換を適用する前に、 connect-distributed.properties を変更して、スキーマレスJSONを使用するようにConnectを構成する必要があります。

key.converter.schemas.enable=false
value.converter.schemas.enable=false

その後、再び分散モードでConnectを再起動する必要があります。

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

ここでも、ソースコネクタPOSTの本文をJSONファイルとして作成する必要があります。 ここでは、これをconnect-file-source-transform.json。と呼びます。

既知のパラメーターに加えて、2つの必要な変換のために数行を追加します。

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

その後、POSTを実行しましょう。

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

test-transformation.txtにいくつかの行を書いてみましょう。

Foo
Bar

connect-transformation トピックを調べると、次の行が表示されます。

{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}

9. レディコネクタの使用

これらの単純なコネクタを使用した後、すぐに使用できるより高度なコネクタとそのインストール方法を見てみましょう。

9.1. コネクタの場所

構築済みのコネクタは、さまざまなソースから入手できます。

  • いくつかのコネクタは、プレーンなApache Kafka(ファイルとコンソールのソースとシンク)にバンドルされています
  • さらにいくつかのコネクタがConfluentプラットフォームにバンドルされています(ElasticSearch、HDFS、JDBC、およびAWS S3)
  • また、Kafkaコネクタのアプリストアの一種である ConfluentHubもチェックしてください。 提供されるコネクタの数は継続的に増加しています。
    • Confluentコネクタ(開発、テスト、文書化され、Confluentによって完全にサポートされています)
    • 認定コネクタ(サードパーティによって実装され、Confluentによって認定されています)
    • コミュニティで開発およびサポートされているコネクタ
  • さらに、Confluentはコネクタページも提供します。これらのコネクタには、Confluentハブでも利用できますが、さらにいくつかのコミュニティコネクタもあります。
  • そして最後に、製品の一部としてコネクタを提供するベンダーもあります。 たとえば、Landoopは Lenses と呼ばれるストリーミングライブラリを提供します。このライブラリには、最大25個のオープンソースコネクタのセットも含まれています(それらの多くは他の場所でもクロスリストされています)

9.2. Confluentハブからのコネクタのインストール

Confluentのエンタープライズバージョンは、Confluent Hubからコネクタおよびその他のコンポーネントをインストールするためのスクリプトを提供します(スクリプトはオープンソースバージョンには含まれていません)。 エンタープライズバージョンを使用している場合は、次のコマンドを使用してコネクタをインストールできます。

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

9.3. コネクタの手動インストール

Confluent Hubで使用できないコネクタが必要な場合、またはConfluentのオープンソースバージョンを使用している場合は、必要なコネクタを手動でインストールできます。 そのためには、コネクタをダウンロードして解凍し、含まれているライブラリをplugin.path。として指定されたフォルダに移動する必要があります。

コネクタごとに、アーカイブには私たちにとって興味深い2つのフォルダが含まれている必要があります。

  • lib フォルダーには、 kafka-connect-mqtt-1.0.0-preview.jar などのコネクターjarと、コネクターに必要なその他のjarが含まれています。
  • etc フォルダーには、1つ以上の参照構成ファイルが保持されます

libフォルダーを$CONFLUENT_HOME / share / java 、またはconnect-standaloneでplugin.pathとして指定したパスに移動する必要があります。プロパティおよびconnect-distributed.properties。 そうすることで、フォルダの名前を意味のある名前に変更することも意味があります。

etc の構成ファイルは、スタンドアロンモードで起動するときに参照するか、プロパティを取得してJSONファイルを作成するだけで使用できます。

10. 結論

このチュートリアルでは、KafkaConnectをインストールして使用する方法を確認しました。

ソースとシンクの両方のコネクタのタイプを調べました。 また、Connectを実行できるいくつかの機能とモードについても説明しました。 次に、変圧器を確認しました。 そして最後に、カスタムコネクタの入手先とインストール方法を学びました。

いつものように、設定ファイルはGitHubで見つけることができます。