カフカコネクタの紹介
1概要
**
**
**
2 Kafka ConnectとKafka Connectorsの基本
-
**
-
一部のコネクタはコミュニティによって管理されていますが、その他のコネクタはConfluentまたはそのパートナーによってサポートされています。実際、S3、JDBC、Cassandraなど、最も一般的なシステム用のコネクタがいくつかあります。
3特徴
-
外部システムとKafkaを接続するためのフレームワーク – ** it
コネクタの開発、展開、管理を簡素化
**
**
**
**
-
変換 – これらは私たちがシンプルで軽量にすることを可能にします
個々のメッセージに対する修正
4セットアップ
-
普通のKafkaディストリビューションを使用する代わりに、Confluent社のKafkaの背後にある会社が提供するKafkaディストリビューションであるConfluent Platformをダウンロードします。 Confluent Platformには、普通のKafkaと比べていくつかの追加のツールやクライアント、およびいくつかの追加のビルド済みコネクタが付属しています。
私たちの場合は、オープンソース版で十分です。これはhttps://www.confluent.io/download/[Confluentのサイト]にあります。
5クイックスタートKafka Connect
まず最初に、Kafka Connectの原理について、最も基本的なConnectorを使って議論しましょう。これは
source
connectorとファイル
sink
connector ** です。
便利なことに、Confluent Platformには、これら両方のコネクターと参照構成が付属しています。
5.1. ソースコネクタの設定
ソースコネクタの場合、参照設定は
$ CONFLUENT
HOME/etc/kafka/connect-file-source.properties__にあります。
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
この構成には、すべてのソースコネクタに共通のいくつかのプロパティがあります。
-
name
は、コネクタインスタンスのユーザー指定の名前です。 -
connector.class
は実装クラス、基本的には種類を指定します
コネクタの
**
tasks.max
は、ソースコネクタのインスタンス数を指定します
並行して実行する必要があり、
**
topic
は、コネクターが送信するトピックを定義します。
出力
-
この場合、コネクタ固有の属性もあります。
-
**
file
は、コネクターが読み込むファイルを定義します。
入力**
これがうまくいくためには、いくつかのコンテンツを含む基本的なファイルを作成しましょう。
echo -e "foo\nbar\n" > $CONFLUENT__HOME/test.txt
作業ディレクトリは$ CONFLUENT__HOMEです。
5.2. シンクコネクタの構成
シンクコネクタには、
$ CONFLUENT
HOME/etc/kafka/connect-file-sink.properties__にある参照設定を使用します。
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
論理的には、正確に同じパラメータが含まれています。今回は
connector.class
にシンクコネクタの実装を指定し、
file
にコネクタがコンテンツを書き込む場所を指定します。
5.3. ワーカー構成
最後に、Connectワーカーを設定する必要があります。これにより、2つのコネクタが統合され、ソースコネクタからの読み取りとシンクコネクタへの書き込みが行われます。
そのためには、
$ CONFLUENT
HOME/etc/kafka/connect-standalone.properties__を使用できます。
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java
plugin.path
は、コネクタ実装が利用可能なパスのリストを保持できます。
Kafkaにバンドルされているコネクタを使用するので、
plugin.path
を
$ CONFLUENT
HOME/share/java__に設定できます。 Windowsを使用している場合は、ここで絶対パスを指定する必要があるかもしれません。
他のパラメータについては、デフォルト値のままにします。
-
bootstrap.servers
には、Kafkaブローカーの住所が含まれています。 -
key.converter
と
value.converter
はコンバータークラスを定義します。
データがソースからソースに流れるときにデータをシリアライズおよびデシリアライズします。
KafkaそしてそれからKafkaから流しへ
**
key.converter.schemas.enable
および
value.converter.schemas.enable
コンバータ固有の設定です
offset.storage.file.filename
は、次の場合に最も重要な設定です。
Connectをスタンドアロンモードで実行する:Connectがどこにあるべきかを定義します。
そのオフセットデータを保存する
offset.flush.interval.ms
は、ワーカーが実行する間隔を定義します。
タスクのオフセットをコミットしようとしています
そしてパラメータのリストは非常に成熟しているので、完全なリストについてはhttp://kafka.apache.org/documentation/#connectconfigs[公式ドキュメント]を調べてください。
5.4. スタンドアロンモードでのKafka Connect
これで、最初のコネクタ設定を開始できます。
$CONFLUENT__HOME/bin/connect-standalone \
$CONFLUENT__HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT__HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT__HOME/etc/kafka/connect-file-sink.properties
まず、コマンドラインを使ってトピックの内容を調べることができます。
$CONFLUENT__HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
ご覧のとおり、ソースコネクタは
test.txt
ファイルからデータを取得し、それをJSONに変換してKafkaに送信しました。
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
そして、
$ CONFLUENT
HOME
フォルダを見ると、ファイル
test.sink.txt
__がここに作成されていることがわかります。
cat $CONFLUENT__HOME/test.sink.txt
foo
bar
シンクコネクタが
payload
属性から値を抽出して宛先ファイルに書き込むと、
test.sink.txt
内のデータには元の
__test.txt
__fileの内容が含まれます。
それでは__test.txtにさらに行を追加しましょう。
これを行うと、ソースコネクタがこれらの変更を自動的に検出することがわかります。
最後に必ず改行を挿入してください。そうしないと、ソースコネクタは最後の行を考慮しません。**
この時点で、接続プロセスを停止しましょう。接続を数行で
分散モード
で開始します。
6. ConnectのREST API
これまでは、コマンドラインからプロパティファイルを渡してすべての設定を行っていました。ただし、Connectはサービスとして実行するように設計されているため、REST APIも利用できます。
デフォルトでは、http://localhost:8083 __にあります。いくつかのエンドポイントは以下のとおりです。
-
GET/connectors
– 使用中のすべてのコネクターを含むリストを返します -
GET/connectors/\ {name}
– 特定のコネクターに関する詳細を返します -
POST/connectors
– 新しいコネクターを作成します。リクエストボディは
文字列の名前フィールドと
コネクター構成パラメーターを持つオブジェクト構成フィールド
**
GET/connectors/\ {name}/status
– の現在のステータスを返します
コネクタ – 実行中、失敗、または一時停止中を含む – どのワーカー
割り当てられている、失敗した場合のエラー情報、および状態
そのすべてのタスク
**
DELETE/connectors/\ {name}
– 優雅にコネクターを削除する
すべてのタスクを停止してその設定を削除する
**
__GET/connector-plugins –
__コネクタプラグインのリストを返す
Kafka Connectクラスターにインストールされている
official documentation
には、すべてのエンドポイントのリストがあります。
次のセクションでは、新しいコネクタを作成するためにREST APIを使用します。
** 7. 分散モードでのKafka Connect
スタンドアローンモードは、開発やテスト、さらに小規模なセットアップでも完璧に機能します。ただし、Kafkaの分散機能を最大限に活用したい場合は、Connectを分散モードで起動する必要があります。
これにより、コネクタ設定とメタデータはファイルシステムではなくKafkaトピックに保存されます。結果として、ワーカーノードは本当にステートレスです。
7.1. Connect
を起動する
分散モードの参照構成は、$ CONFLUENT
HOME
/etc/kafka/connect-distributed.properties .
__にあります。
-
パラメータはほとんどスタンドアローンモードと同じです。わずかな違いがあります。
-
group.id
は、Connectクラスタグループの名前を定義します。値
どのコンシューマグループIDとも異なる必要があります
**
offset.storage.topic
、
config.storage.topic
、
status.storage.topic
はこれらの設定のトピックを定義します。トピックごとに、複製係数も定義できます。
繰り返しますが、http://kafka.apache.org/documentation/#connectconfigs[非公開文書]にすべてのパラメータの一覧があります。
次のようにしてConnectを分散モードで起動できます。
$CONFLUENT__HOME/bin/connect-distributed $CONFLUENT__HOME/etc/kafka/connect-distributed.properties
7.2. REST APIを使用したコネクタの追加
現在、スタンドアロンの起動コマンドと比較して、コネクタ設定は引数として渡されていません。代わりに、REST APIを使用してコネクタを作成する必要があります。
前の例を設定するには、次のJSON構造体を含む
http://localhost:8083/connectors
に2つのPOSTリクエストを送信する必要があります。
まず、ソースコネクタPOSTのボディをJSONファイルとして作成する必要があります。ここでは、それを
connect-file-source.json
と呼びます。
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
-
これが最初に使用した参照設定ファイルとかなり似ていることに注意してください。
そしてそれをPOSTします。
curl -d @"$CONFLUENT__HOME/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
それから、シンクコネクタについても同じようにして、ファイル
connect-file-sink.json
を呼び出します。
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
そして以前のようにPOSTを実行します。
curl -d @$CONFLUENT__HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
必要に応じて、この設定が正しく機能していることを確認できます。
$CONFLUENT__HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
そして、
$ CONFLUENT
HOME
フォルダーを見れば、ファイル
test-distributed.sink.txt
__がここで作成されたことがわかります。
cat $CONFLUENT__HOME/test-distributed.sink.txt
foo
bar
分散セットアップをテストした後、2つのコネクタを外してクリーンアップしましょう。
curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink
8データを変換する
8.1. サポートされている変換
変換により、個々のメッセージを簡単かつ軽量に変更できます。
Kafka Connectは以下の組み込み変換をサポートしています。
-
InsertField
– 静的データまたはレコードを使用してフィールドを追加します
メタデータ
**
ReplaceField
– フィールドをフィルタリングまたは名前変更する
-
MaskField
– フィールドをその型の有効なNULL値に置き換えます。
(ゼロまたは空の文字列など)
**
HoistField
– イベント全体をstruct内の単一フィールドとしてラップする
または地図
**
ExtractField
– 構造体から特定のフィールドを抽出してマップし、
このフィールドのみを結果に含める
**
SetSchemaMetadata
– スキーマ名またはバージョンを変更する
-
TimestampRouter
– オリジナルに基づいてレコードのトピックを修正する
トピックとタイムスタンプ
**
RegexRouter
– 元のトピックに基づいてレコードのトピックを修正します。
置換文字列と正規表現
変換は次のパラメータを使用して設定されます。
-
transforms
– の別名のコンマ区切りリスト
変形
**
transforms。$ alias.type
– 変換のためのクラス名
-
transforms。$ alias。$ transformationSpecificConfig
– 設定
それぞれの変換
8.2. トランスフォーマーを適用する
いくつかの変換機能をテストするには、次の2つの変換を設定しましょう。
-
まず、メッセージ全体をJSON構造体としてラップしましょう。
-
その後、その構造体にフィールドを追加しましょう
変換を適用する前に、
connect-distributed.properties
を変更して、スキーマレスJSONを使用するようにConnectを設定する必要があります。
key.converter.schemas.enable=false
value.converter.schemas.enable=false
その後、Connectを再起動しなければなりません。これは分散モードです。
$CONFLUENT__HOME/bin/connect-distributed $CONFLUENT__HOME/etc/kafka/connect-distributed.properties
繰り返しますが、ソースコネクタPOSTのボディをJSONファイルとして作成する必要があります。ここでは、それを__connect-file-source-transform.jsonと呼びます。
既知のパラメータに加えて、必要な2つの変換について数行を追加します。
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data__source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
その後、POSTを実行しましょう。
curl -d @$CONFLUENT__HOME/connect-file-source-transform.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
test-transformation.txt
にいくつかの行を書きましょう。
Foo
Bar
ここで
connect-transformation
トピックを調べると、次の行が表示されるはずです。
{"line":"Foo","data__source":"test-file-source"}
{"line":"Bar","data__source":"test-file-source"}
9準備完了コネクタの使用
これらの単純なコネクタを使用した後で、より高度なすぐ使えるコネクタとそれらをインストールする方法を見てみましょう。
9.1. コネクタの場所
-
既製のコネクタはさまざまな供給元から入手できます。**
-
いくつかのコネクタは普通のApache Kafka(sourceとsink)にバンドルされています。
ファイルとコンソール用)
** Confluenceプラットフォームには、さらにいくつかのコネクタがバンドルされています。
(ElasticSearch、HDFS、JDBC、およびAWS S3)
**
Confluent Hub
もチェック
Kafkaコネクタ用のApp Storeのようなものです。提供されるコネクタの数は増え続けています。
コンフルエントコネクタ
Confluentによるサポート)
認定コネクタ(サードパーティによって実装され、認定されたコネクタ
コンフルエント)
コミュニティ開発およびサポートされているコネクタ
-
それを超えて、Confluentはまた、
Connectors Page
、一部は
Confluent Hubでも利用可能なコネクタだけでなく、
いくつかのコミュニティコネクタ
** そして最後に、コネクタの一部としてコネクタを提供するベンダもあります。
彼らの製品たとえば、Landoopはhttps://www.landoop.com/downloads/[Lenses]というストリーミングライブラリを提供しています。これには、〜25個のオープンソースコネクタも含まれています(それらの多くは他の場所でもクロスリストになっています)
9.2. Confluenceハブからのコネクタのインストール
Confluenceのエンタープライズ版には、Confluenceハブからコネクターおよびその他のコンポーネントをインストールするためのスクリプトが用意されています(このスクリプトはオープンソース版には含まれていません)。エンタープライズ版を使用している場合は、次のコマンドを使用してコネクタをインストールできます。
$CONFLUENT__HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview
9.3. 手動でコネクタをインストールする
Confluent Hubで利用できないコネクタが必要な場合、またはConfluenceのオープンソースバージョンがある場合は、必要なコネクタを手動でインストールできます。そのためには、コネクタをダウンロードして解凍し、含まれているlibを__plugin.pathとして指定されているフォルダに移動する必要があります。
各コネクタについて、アーカイブには2つの興味深いフォルダが含まれています。
-
lib
フォルダには、コネクタjarが含まれています。
例:
kafka-connect-mqtt-1.0.0-preview.jar
、およびその他
コネクタに必要なjarファイル
**
etc
フォルダには、1つ以上の参照設定ファイルがあります。
-
lib
フォルダを
$ CONFLUENT
HOME/share/java
、または
connect-standalone.properties
と
connect-distributed.properties
で
plugin.path__として指定したパスに移動する必要があります。
その際、フォルダーの名前を意味のある名前に変更することも意味があるかもしれません。
スタンドアローンモードで起動している間にそれらを参照することによって
etc
から設定ファイルを使用することができます、あるいは単にそれらを取得してそれらからJSONファイルを作成することができます。
10結論
いつものように、設定ファイルはhttps://github.com/eugenp/tutorials/tree/master/libraries-data[on GitHub]にあります。