1. 概要
前回の記事では、さまざまなタイプのコネクタ、Connectの基本機能、REST APIなど、KafkaConnectについて簡単に紹介しました。
このチュートリアルでは、Kafkaコネクタを使用して、より「現実的な」例を作成します。
コネクタを使用してMQTTを介してデータを収集し、収集したデータをMongoDBに書き込みます。
2. Dockerを使用したセットアップ
DockerComposeを使用してインフラストラクチャをセットアップします。 これには、ソースとしてのMQTTブローカー、Zookeeper、ミドルウェアとしての1つのKafkaブローカーとKafka Connect、そして最後にシンクとしてのGUIツールを含むMongoDBインスタンスが含まれます。
2.1. コネクタの取り付け
この例に必要なコネクタ、MQTTソースとMongoDBシンクコネクタは、プレーンなKafkaまたはConfluentプラットフォームには含まれていません。
前回の記事で説明したように、Confluentハブからコネクタ(MQTTおよびMongoDB)をダウンロードできます。 その後、jarファイルをフォルダーに解凍する必要があります。このフォルダーは、次のセクションでKafkaConnectコンテナーにマウントします。
そのためにフォルダ/tmp / custom /jarsを使用しましょう。 Kafka Connectは起動時にコネクタをオンラインでロードするため、次のセクションで作成スタックを開始する前に、jarをそこに移動する必要があります。
2.2. Docker作成ファイル
セットアップを、6つのコンテナーで構成される単純なDocker構成ファイルとして説明します。
version: '3.3'
services:
mosquitto:
image: eclipse-mosquitto:1.5.5
hostname: mosquitto
container_name: mosquitto
expose:
- "1883"
ports:
- "1883:1883"
zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /tmp/custom/jars:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- mosquitto
mongo-db:
image: mongo:4.0.5
hostname: mongo-db
container_name: mongo-db
expose:
- "27017"
ports:
- "27017:27017"
command: --bind_ip_all --smallfiles
volumes:
- ./mongo-db:/data
mongoclient:
image: mongoclient/mongoclient:2.2.0
container_name: mongoclient
hostname: mongoclient
depends_on:
- mongo-db
ports:
- 3000:3000
environment:
MONGO_URL: "mongodb://mongo-db:27017"
PORT: 3000
expose:
- "3000"
mosquitto コンテナーは、EclipseMosquittoに基づく単純なMQTTブローカーを提供します。
コンテナーzookeeperおよびkafkaは、単一ノードのKafkaクラスターを定義します。
kafka-connect は、Connectアプリケーションを分散モードで定義します。
そして最後に、 mongo-db は、シンクデータベースと、送信されたデータがデータベースに正しく到着したかどうかを確認するのに役立つWebベースのmongoclientを定義します。
次のコマンドを使用してスタックを開始できます。
docker-compose up
3. コネクタ構成
Kafka Connectが稼働しているので、コネクタを構成できます。
3.1. ソースコネクタを構成する
RESTAPIを使用してソースコネクタを構成しましょう。
curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
connect-mqtt-source.jsonファイルは次のようになります。
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics": "baeldung",
"kafka.topic": "connect-custom",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
これまで使用したことのないプロパティがいくつかあります。
- mqtt.server.uri は、コネクタが接続するエンドポイントです
- mqtt.topics は、コネクタがサブスクライブするMQTTトピックです
- kafka.topic は、コネクタが受信したデータを送信するKafkaトピックを定義します
- value.converter は、受信したペイロードに適用されるコンバーターを定義します。 MQTTコネクタはデフォルトでBase64を使用しますが、プレーンテキストを使用するため、ByteArrayConverterが必要です。
- confluent.topic.bootstrap.servers は、最新バージョンのコネクタに必要です
- 同じことがconfluent.topic.replication.factorにも当てはまります。これは、Confluent内部トピックのレプリケーションファクターを定義します。クラスターにはノードが1つしかないため、その値を1に設定する必要があります。
3.2. テストソースコネクタ
MQTTブローカーに短いメッセージを公開して、簡単なテストを実行してみましょう。
docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"
そして、このトピックを聞くと、 connect-custom :
docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning
次に、テストメッセージが表示されます。
3.3. シンクコネクタの設定
次に、シンクコネクタが必要です。 もう一度RESTAPIを使用してみましょう。
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
connect-mongodb-sink.jsonファイルは次のようになります。
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": 1,
"topics": "connect-custom",
"mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
"mongodb.collection": "MyCollection",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
ここには、次のMongoDB固有のプロパティがあります。
- mongodb.connection.uri には、MongoDBインスタンスの接続文字列が含まれています
- mongodb.collectionはコレクションを定義します
- MongoDBコネクタはJSONを想定しているため、key.converterおよびvalue.converterにJsonConverterを設定する必要があります。
- また、MongoDBにはスキーマレスJSONも必要なので、key.converter.schemas.enableおよびvalue.converter.schemas.enableをfalseに設定する必要があります。
3.4. テストシンクコネクタ
トピックconnect-customにはすでにMQTTコネクタテストからのメッセージが含まれているため、MongoDBコネクタは作成直後にメッセージをフェッチする必要があります。
したがって、MongoDBですぐに見つける必要があります。 URL http:// localhost:3000 /を開くと、そのためのWebインターフェイスを使用できます。 ログイン後、 私のコレクション左側で、 実行する 、およびテストメッセージが表示されます。
3.5. エンドツーエンドテスト
これで、MQTTクライアントを使用して任意のJSON構造体を送信できます。
{
"firstName": "John",
"lastName": "Smith",
"age": 25,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021"
},
"phoneNumber": [{
"type": "home",
"number": "212 555-1234"
}, {
"type": "fax",
"number": "646 555-4567"
}],
"gender": {
"type": "male"
}
}
MongoDBはスキーマフリーのJSONドキュメントをサポートしており、コンバーターのスキーマを無効にしたため、構造体はすぐにコネクターチェーンを通過し、データベースに保存されます。
ここでも、 http:// localhost:3000/でWebインターフェイスを使用できます。
3.6. 掃除
完了したら、実験をクリーンアップして2つのコネクタを削除できます。
curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink
その後、 Ctrl +Cを使用してComposeスタックをシャットダウンできます。
4. 結論
このチュートリアルでは、Kafka Connectを使用して、MQTTを介してデータを収集し、収集したデータをMongoDBに書き込む例を作成しました。
いつものように、設定ファイルはGitHubので見つけることができます。