MQTTおよびMongoDBを使用したKafka Connectの例

1. 概要

link:/kafka-connectors-guide [前の記事]では、さまざまな種類のコネクタ、Connectの基本機能、REST APIなど、Kafka Connectの簡単な紹介がありました。
このチュートリアルでは、Kafkaコネクタを使用して、より「現実の世界」の例を構築します。
*コネクタを使用してMQTT経由でデータを収集し、収集したデータをMongoDBに書き込みます*

*2. Docker *を使用したセットアップ

https://docs.docker.com/compose/[Docker Compose]を使用してインフラストラクチャをセットアップします。 これには、ソースとしてのMQTTブローカー、Zookeeper、1つのKafkaブローカー、ミドルウェアとしてのKafka Connect、最後にGUIツールを含むMongoDBインスタンスが含まれます。

* 2.1。 コネクタのインストール*

この例に必要なコネクタであるMQTTソースとMongoDBシンクコネクタは、プレーンなKafkaまたはConfluentプラットフォームには含まれていません。
前の記事で説明したように、コネクター(https://www.confluent.io/connector/kafka-connect-mqtt/[MQTT]およびhttps://www.confluent.io/connector/をダウンロードできます。 confluentハブからのkafka-connect-mongodb-sink / [MongoDB])。 その後、jarをフォルダーに解凍する必要があります。フォルダーは、次のセクションのKafka Connectコンテナーにマウントします。
そのために_ / tmp / custom / jars_フォルダーを使用しましょう。 Kafka Connectは起動時にコネクタをオンラインでロードするため、次のセクションで構成スタックを開始する前に、jarをそこに移動する必要があります。

* 2.2。 Docker作成ファイル*

6つのコンテナで構成されるシンプルなDocker構成ファイルとしてセットアップを説明します。
version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"
_mosquitto_コンテナーは、Eclipse Mosquittoに基づいた単純なMQTTブローカーを提供します。
container _zookeeper_および_kafka_は、単一ノードのKafkaクラスターを定義します。
_kafka-connect_は、Connectアプリケーションを分散モードで定義します。
最後に、_mongo-db_は、シンクデータベースと、Webベースの_mongoclient_を定義します。これは、送信されたデータがデータベースに正しく到着したかどうかを確認するのに役立ちます。
次のコマンドを使用してスタックを開始できます。
docker-compose up

3. コネクタ構成

Kafka Connectが起動して実行されるようになったため、コネクタを構成できます。

* 3.1。 ソースコネクタの構成*

REST APIを使用してソースコネクタを設定しましょう。
curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
_connect-mqtt-source.json_ fileは次のようになります。
{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}
これまで使用したことのないプロパティがいくつかあります。
  • _mqtt.server.uri_は、コネクタが接続するエンドポイントです

  • _mqtt.topics_は、コネクタがサブスクライブするMQTTトピックです

  • _kafka.topic_は、コネクタが送信するKafkaトピックを定義します
    受信したデータ

  • _value.converter_は、に適用されるコンバーターを定義します
    受信したペイロード。 MQTTコネクタはデフォルトでBase64を使用するため、プレーンテキストを使用するため、_ByteArrayConverter_が必要です。

  • _confluent.topic.bootstrap.servers_は最新バージョンに必要です
    コネクタの

  • 同じことが_confluent.topic.replication.factor_にも当てはまります。
    Confluent-internalトピックのレプリケーションファクター–クラスターにはノードが1つしかないため、その値を1に設定する必要があります

* 3.2。 テストソースコネクタ*

MQTTブローカーに短いメッセージを発行して、簡単なテストを実行しましょう。
docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"
そして、トピックを聞くと、_connect-custom_:
docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning
その後、テストメッセージが表示されます。

* 3.3。 セットアップシンクコネクタ*

次に、シンクコネクタが必要です。 再びREST APIを使用しましょう。
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
_connect-mongodb-sink.json_ファイルは次のようになります。
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}
ここには、次のMongoDB固有のプロパティがあります。
  • _mongodb.connection.uri_には、接続文字列が含まれています
    MongoDBインスタンス

  • _mongodb.collection_はコレクションを定義します

  • MongoDBコネクターはJSONを想定しているため、設定する必要があります
    key.converter_および_value.converter_の_JsonConverter

  • また、MongoDBにはスキーマレスJSONも必要なので、設定する必要があります
    key.converter.schemas.enable_および_value.converter.schemas.enable_から_false

* 3.4。 テストシンクコネクタ*

トピック_connect-custom_にはMQTTコネクターテストからのメッセージが既に含まれているため、* MongoDBコネクターは作成後にそれらを直接フェッチする必要があります*。
したがって、MongoDBですぐに見つける必要があります。 ** URL http:// localhost:3000 / [_http:// localhost:3000 / _]を開くことにより、そのためのWebインターフェイスを使用できます。 **ログイン後、左側の_MyCollection_を選択し、_Execute_を押すと、テストメッセージが表示されます。

* 3.5。 エンドツーエンドテスト*

これで、MQTTクライアントを使用してJSON構造体を送信できます。
{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}
MongoDBはスキーマフリーのJSONドキュメントをサポートし、コンバーターのスキーマを無効にしたため、構造体はすぐにコネクタチェーンを通過してデータベースに保存されます。
繰り返しますが、http:// localhost:3000 / [_http:// localhost:3000 / _]のWebインターフェイスを使用できます。

* 3.6。 掃除*

完了したら、実験をクリーンアップして2つのコネクタを削除します。
curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink
その後、_Ctrl C_を使用してComposeスタックをシャットダウンできます。

4. 結論

このチュートリアルでは、MaftTを介してデータを収集し、収集したデータをMongoDBに書き込むために、Kafka Connectを使用した例を作成しました。
いつものように、設定ファイルはhttps://github.com/eugenp/tutorials/tree/master/libraries-data[on GitHub]で見つけることができます。