1. 概要

前回の記事では、さまざまなタイプのコネクタ、Connectの基本機能、REST APIなど、KafkaConnectについて簡単に紹介しました。

このチュートリアルでは、Kafkaコネクタを使用して、より「現実的な」例を作成します。

コネクタを使用してMQTTを介してデータを収集し、収集したデータをMongoDBに書き込みます。

2. Dockerを使用したセットアップ

DockerComposeを使用してインフラストラクチャをセットアップします。 これには、ソースとしてのMQTTブローカー、Zookeeper、ミドルウェアとしての1つのKafkaブローカーとKafka Connect、そして最後にシンクとしてのGUIツールを含むMongoDBインスタンスが含まれます。

2.1. コネクタの取り付け

この例に必要なコネクタ、MQTTソースとMongoDBシンクコネクタは、プレーンなKafkaまたはConfluentプラットフォームには含まれていません。

前回の記事で説明したように、Confluentハブからコネクタ(MQTTおよびMongoDB)をダウンロードできます。 その後、jarファイルをフォルダーに解凍する必要があります。このフォルダーは、次のセクションでKafkaConnectコンテナーにマウントします。

そのためにフォルダ/tmp / custom /jarsを使用しましょう。 Kafka Connectは起動時にコネクタをオンラインでロードするため、次のセクションで作成スタックを開始する前に、jarをそこに移動する必要があります。

2.2. Docker作成ファイル

セットアップを、6つのコンテナーで構成される単純なDocker構成ファイルとして説明します。

version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

mosquitto コンテナーは、EclipseMosquittoに基づく単純なMQTTブローカーを提供します。

コンテナーzookeeperおよびkafkaは、単一ノードのKafkaクラスターを定義します。

kafka-connect は、Connectアプリケーションを分散モードで定義します。

そして最後に、 mongo-db は、シンクデータベースと、送信されたデータがデータベースに正しく到着したかどうかを確認するのに役立つWebベースのmongoclientを定義します。

次のコマンドを使用してスタックを開始できます。

docker-compose up

3. コネクタ構成

Kafka Connectが稼働しているので、コネクタを構成できます。

3.1. ソースコネクタを構成する

RESTAPIを使用してソースコネクタを構成しましょう。

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

connect-mqtt-source.jsonファイルは次のようになります。

{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}

これまで使用したことのないプロパティがいくつかあります。

  • mqtt.server.uri は、コネクタが接続するエンドポイントです
  • mqtt.topics は、コネクタがサブスクライブするMQTTトピックです
  • kafka.topic は、コネクタが受信したデータを送信するKafkaトピックを定義します
  • value.converter は、受信したペイロードに適用されるコンバーターを定義します。 MQTTコネクタはデフォルトでBase64を使用しますが、プレーンテキストを使用するため、ByteArrayConverterが必要です。
  • confluent.topic.bootstrap.servers は、最新バージョンのコネクタに必要です
  • 同じことがconfluent.topic.replication.factorにも当てはまります。これは、Confluent内部トピックのレプリケーションファクターを定義します。クラスターにはノードが1つしかないため、その値を1に設定する必要があります。

3.2. テストソースコネクタ

MQTTブローカーに短いメッセージを公開して、簡単なテストを実行してみましょう。

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

そして、このトピックを聞くと、 connect-custom

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

次に、テストメッセージが表示されます。

3.3. シンクコネクタの設定

次に、シンクコネクタが必要です。 もう一度RESTAPIを使用してみましょう。

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

connect-mongodb-sink.jsonファイルは次のようになります。

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

ここには、次のMongoDB固有のプロパティがあります。

  • mongodb.connection.uri には、MongoDBインスタンスの接続文字列が含まれています
  • mongodb.collectionはコレクションを定義します
  • MongoDBコネクタはJSONを想定しているため、key.converterおよびvalue.converterJsonConverterを設定する必要があります。
  • また、MongoDBにはスキーマレスJSONも必要なので、key.converter.schemas.enableおよびvalue.converter.schemas.enablefalseに設定する必要があります。

3.4. テストシンクコネクタ

トピックconnect-customにはすでにMQTTコネクタテストからのメッセージが含まれているため、MongoDBコネクタは作成直後にメッセージをフェッチする必要があります

したがって、MongoDBですぐに見つける必要があります。 URL http:// localhost:3000 /を開くと、そのためのWebインターフェイスを使用できます。 ログイン後、 私のコレクション左側で、 実行する 、およびテストメッセージが表示されます。

3.5. エンドツーエンドテスト

これで、MQTTクライアントを使用して任意のJSON構造体を送信できます。

{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}

MongoDBはスキーマフリーのJSONドキュメントをサポートしており、コンバーターのスキーマを無効にしたため、構造体はすぐにコネクターチェーンを通過し、データベースに保存されます。

ここでも、 http:// localhost:3000/でWebインターフェイスを使用できます。

3.6. 掃除

完了したら、実験をクリーンアップして2つのコネクタを削除できます。

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

その後、 Ctrl +Cを使用してComposeスタックをシャットダウンできます。

4. 結論

このチュートリアルでは、Kafka Connectを使用して、MQTTを介してデータを収集し、収集したデータをMongoDBに書き込む例を作成しました。

いつものように、設定ファイルはGitHubで見つけることができます。