MQTT、NiFi、およびInfluxDBを使用したIoTデータパイプライン
1. 序章
このチュートリアルでは、IoTアプリケーションのデータパイプラインを作成するときに必要なものを学習します。
その過程で、IoTアーキテクチャの特徴を理解し、MQTTブローカー、NiFi、InfluxDBなどのさまざまなツールを活用して、IoTアプリケーション用の高度にスケーラブルなデータパイプラインを構築する方法を見ていきます。
2. IoTとそのアーキテクチャ
まず、いくつかの基本的な概念を確認し、IoTアプリケーションの一般的なアーキテクチャを理解しましょう。
2.1. IoTとは?
モノのインターネット(IoT)は、「モノ」として知られる物理オブジェクトのネットワークを広く指します。 たとえば、電球などの一般的な家庭用品から高度な産業機器まで、あらゆるものを含めることができます。 このネットワークを介して、さまざまなセンサーやアクチュエーターをインターネットに接続し、データを交換することができます。
現在、私たちは非常に異なる環境に物事を展開できます。たとえば、環境は私たちの家でも、移動する貨物トラックのようにまったく異なるものでもかまいません。 ただし、これらに利用できる電源とネットワークの品質については、実際には何も想定できません。 その結果、これによりIoTアプリケーションに固有の要件が生じます。
2.2. IoTアーキテクチャの概要
一般的なIoTアーキテクチャは、通常、それ自体を4つの異なるレイヤーに構造化します。 データが実際にこれらのレイヤーをどのように流れるかを理解しましょう。
まず、センシング層は、主に環境から測定値を収集するセンサーで構成されています。 次に、ネットワーク層は生データを集約し、処理のためにインターネット経由で送信するのに役立ちます。 さらに、データ処理レイヤーは生データをフィルタリングし、初期の分析を生成します。 最後に、アプリケーション層は強力なデータ処理機能を採用して、データのより詳細な分析と管理を実行します。
3. MQTT、NiFi、およびInfluxDBの概要
それでは、今日のIoTセットアップで広く使用されているいくつかの製品を見てみましょう。 これらはすべて、IoTアプリケーションのデータ要件に適した独自の機能を提供します。
3.1. MQTT
メッセージキューテレメトリトランスポート(MQTT)は、軽量のパブリッシュ/サブスクライブネットワークプロトコルです。 現在、OASISおよびISO標準です。 IBMは当初、デバイス間でメッセージを転送するために開発しました。 MQTTは、メモリ、ネットワーク帯域幅、および電源が不足している制約のある環境に適しています。
MQTT は、クライアントサーバーモデルに従います。このモデルでは、さまざまなコンポーネントがクライアントとして機能し、TCPを介してサーバーに接続できます。 このサーバーはMQTTブローカーとして知られています。 クライアントは、トピックと呼ばれるアドレスにメッセージを公開できます。 また、トピックをサブスクライブして、トピックに公開されているすべてのメッセージを受信することもできます。
一般的なIoTセットアップでは、センサーは温度などの測定値をMQTTブローカーに公開でき、アップストリームデータ処理システムはこれらのトピックにサブスクライブしてデータを受信できます。
ご覧のとおり、MQTTのトピックは階層的です。 システムは、ワイルドカードを使用して、トピックの階層全体を簡単にサブスクライブできます。
MQTT は、3つのレベルのサービス品質(QoS)をサポートします。 これらは、「最大1回配信」、「少なくとも1回配信」、「1回だけ配信」です。 QoSは、クライアントとサーバー間の合意のレベルを定義します。 各クライアントは、環境に適したサービスのレベルを選択できます。
クライアントは、公開中にメッセージを永続化するようにブローカーに要求することもできます。 一部のセットアップでは、MQTTブローカーが接続するためにクライアントからのユーザー名とパスワードの認証を必要とする場合があります。 さらに、プライバシーのために、TCP接続はSSL/TLSで暗号化される場合があります。
使用可能なMQTTブローカーの実装とクライアントライブラリがいくつかあります。たとえば、 HiveMQ 、 Mosquitto 、 PahoMQTTなどです。 このチュートリアルの例では、Mosquittoを使用します。 MosquittoはEclipseFoundationの一部であり、RaspberryPiやArduinoなどのボードに簡単にインストールできます。
3.2. Apache NiFi
Apache NiFi は、もともとNSAによってNiagaraFilesとして開発されました。 システム間のデータフローの自動化と管理を容易にし、アプリケーションをブラックボックスプロセスのネットワークとして定義するフローベースプログラミングモデルに基づいています。
最初にいくつかの基本的な概念を見ていきましょう。 NiFiでシステム内を移動するオブジェクトは、FlowFileと呼ばれます。 FlowFileプロセッサは、実際には、FlowFileのルーティング、変換、メディエーションなどの便利な作業を実行します。 FlowFileプロセッサはConnectionsに接続されています。
プロセスグループは、コンポーネントをグループ化してNiFiのデータフローを整理するメカニズムです。 プロセスグループは、入力ポートを介してデータを受信し、出力ポートを介してデータを送信できます。 リモートプロセスグループ(RPG)は、NiFiのリモートインスタンスとの間でデータを送受信するメカニズムを提供します。
さて、その知識を持って、NiFiアーキテクチャを見てみましょう。
NiFiは、JVM内で複数のコンポーネントを実行するJavaベースのプログラムです。 Webサーバーは、コマンドアンドコントロールAPIをホストするコンポーネントです。 Flow Controllerは、拡張機能が実行するリソースを受け取るスケジュールを管理するNiFiのコアコンポーネントです。 拡張機能により、NiFiを拡張可能にし、さまざまなシステムとの統合をサポートできます。
NiFiは、FlowFileリポジトリ内のFlowFileの状態を追跡します。 FlowFileの実際のコンテンツバイトは、コンテンツリポジトリにあります。 最後に、FlowFileに関連する来歴イベントデータは来歴リポジトリにあります。
ソースでのデータ収集には、より小さなフットプリントと低リソース消費が必要な場合があるため、NiFiにはMiNiFiと呼ばれるサブプロジェクトがあります。 MiNiFiは、NiFi の補完的なデータ収集アプローチを提供し、Site-to-Site(S2S)プロトコルを介してNiFiと簡単に統合できます。
さらに、 MiNiFiコマンドアンドコントロール(C2)プロトコルによるエージェントの集中管理を可能にします。 さらに、CoC情報の完全なチェーンを生成することにより、データの出所を確立するのに役立ちます。
3.3. InfluxDB
InfluxDB は、 Go で記述され、InfluxDataによって開発された時系列データベースです。 これは、時系列データの高速で高可用性の保存と取得のために設計されています。 これは、アプリケーションメトリック、IoTセンサーデータ、およびリアルタイム分析の処理に特に適しています。
まず、InfluxDBのデータは時系列で整理されています。 時系列には、ゼロまたは多くのポイントを含めることができます。 ポイントは、4つのコンポーネント(測定、タグセット、フィールドセット、およびタイムスタンプ)を持つ単一のデータレコードを表します。
まず、タイムスタンプは、特定のポイントに関連付けられたUTC日時を示します。 フィールドセットは、1つ以上のフィールドキーとフィールド値のペアで構成されます。 ポイントのラベルを使用して実際のデータをキャプチャします。 同様に、タグセットはタグとキーと値のペアで構成されますが、これらはオプションです。 これらは基本的にポイントのメタデータとして機能し、クエリ応答を高速化するためにインデックスを付けることができます。
測定値は、タグセット、フィールドセット、およびタイムスタンプのコンテナとして機能します。 さらに、InfluxDBのすべてのポイントに、保持ポリシーを関連付けることができます。 保持ポリシーは、InfluxDBがデータを保持する期間と、レプリケーションによって作成されるコピーの数を示します。
最後に、データベースは、ユーザー、保持ポリシー、継続的なクエリ、および時系列データの論理コンテナーとして機能します。 InfluxDBのデータベースは、従来のリレーショナルデータベースと大まかに似ていると理解できます。
さらに、InfluxDBはInfluxDataプラットフォームの一部であり、時系列データを効率的に処理するための他のいくつかの製品を提供しています。 InfluxDataは現在、オープンソースプラットフォームであるInfluxDB OSS 2.0と、商用製品であるInfluxDBCloudとして提供しています。
InfluxDBとは別に、プラットフォームには Chronograf が含まれており、InfluxDataプラットフォームの完全なインターフェイスを提供します。 さらに、メトリックとイベントを収集およびレポートするためのエージェントであるTelegrafが含まれています。 最後に、リアルタイムストリーミングデータ処理エンジンであるKapacitorがあります。
4. IoTデータパイプラインの実践
これで、これらの製品を一緒に使用してIoTアプリケーションのデータパイプラインを作成するのに十分な基礎をカバーしました。 このチュートリアルでは、複数の都市にまたがる複数の観測所から大気質関連の測定値を収集していると仮定します。 たとえば、測定には、対流圏オゾン、一酸化炭素、二酸化硫黄、二酸化窒素、およびエアロゾルが含まれます。
4.1. インフラストラクチャのセットアップ
まず、都市のすべての気象観測所にすべてのセンシング機器が装備されていると仮定します。 さらに、これらのセンサーはRaspberry Piのようなボードに配線され、アナログデータを収集してデジタル化します。 ボードはワイヤレスに接続され、生の測定値をアップストリームに送信します。
地域の管制局は、市内のすべての気象観測所からデータを収集します。 このデータを集約してローカル分析エンジンにフィードすることで、より迅速な洞察を得ることができます。 すべての地域コントロールセンターからのフィルタリングされたデータは、ほとんどがクラウドでホストされている中央コマンドセンターに送信されます。
4.2. IoTアーキテクチャの作成
これで、単純な空気品質アプリケーション用のIoTアーキテクチャを設計する準備が整いました。 ここでは、MQTTブローカー、MiNiFi Javaエージェント、NiFi、およびInfluxDBを使用します。
ご覧のとおり、気象観測所のサイトでMosquittoMQTTブローカーとMiNiFiJavaエージェントを使用しています。 地域のコントロールセンターでは、NiFiサーバーを使用してデータを集約およびルーティングしています。 最後に、InfluxDBを使用して、コマンドセンターレベルで測定値を保存しています。
4.3. インストールの実行
MosquittoMQTTブローカーとMiNiFiJavaエージェントをRaspberryPiのようなボードにインストールするのは非常に簡単です。 ただし、このチュートリアルでは、ローカルマシンにインストールします。
Eclipse Mosquito の公式ダウンロードページは、いくつかのプラットフォーム用のバイナリを提供します。 インストールすると、インストールディレクトリからMosquittoを起動するのは非常に簡単です。
net start mosquitto
さらに、NiFiバイナリは公式サイトからダウンロードすることもできます。 ダウンロードしたアーカイブを適切なディレクトリに抽出する必要があります。 MiNiFiはサイト間プロトコルを使用してNiFiに接続するため、 サイト間入力ソケットポートを指定する必要がありますの
# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.port=1026
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
次に、NiFiを開始できます。
<NIFI_HOME>/bin/run-nifi.bat
同様に、JavaまたはC++ MiNiFiエージェントおよびツールキットバイナリは、公式サイトからダウンロードできます。 ここでも、適切なディレクトリにアーカイブを抽出する必要があります。
MiNiFi(デフォルトでは)には、ごくわずかなプロセッサセットが付属しています。 MQTTからデータを消費するため、MQTTプロセッサをにコピーする必要があります
COPY <NIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar <MINIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar
次に、MiNiFiエージェントを開始できます。
<MINIFI_HOME>/bin/run-minifi.bat
最後に、公式サイトからInfluxDBのオープンソースバージョンをダウンロードできます。 以前と同様に、アーカイブを抽出して、次の簡単なコマンドでInfluxDBを起動できます。
<INFLUXDB_HOME>/influxd.exe
このチュートリアルのデフォルトとして、ポートを含む他のすべての構成を保持する必要があります。 これで、ローカルマシンへのインストールとセットアップは完了です。
4.4. NiFiデータフローの定義
これで、データフローを定義する準備が整いました。 NiFi は、データフローを作成および監視するための使いやすいインターフェイスを提供します。 これは、URL http:// localhost:8080/nifiからアクセスできます。
まず、NiFiサーバーで実行されるメインデータフローを定義します。
ここでは、ご覧のとおり、MiNiFiエージェントからデータを受信する入力ポートを定義しました。 さらに、InfluxDBへのデータの保存を担当するPutInfluxDBプロセッサへの接続を介してデータを送信します。 このプロセッサの構成では、InfluxDBの接続URLと、データを送信するデータベース名を定義しました。
4.5. MiNiFiデータフローの定義
次に、MiNiFiエージェントで実行されるデータフローを定義します。 NiFiと同じユーザーインターフェイスを使用し、データフローをテンプレートとしてエクスポートして、MiNiFiエージェントでこれを構成します。 MiNiFiエージェントのデータフローを定義しましょう。
ここでは、MQTTブローカーからのデータの取得を担当するConsumeMQTTプロセッサーを定義しました。 プロパティには、ブローカーURIとトピックフィルターが用意されています。 階層air-qualityの下で定義されたすべてのトピックからデータを取得しています。
また、リモートプロセスグループを定義し、それをConcumeMQTTプロセッサに接続しました。 リモートプロセスグループは、サイト間プロトコルを介してデータをNiFiにプッシュする役割を果たします。
このデータフローをテンプレートとして保存し、XMLファイルとしてダウンロードできます。 このファイルにconfig.xmlという名前を付けましょう。 これで、コンバーターツールキットを使用して、このテンプレートをXMLからYAMLに変換できます。これはMiNiFiエージェントが使用します。
<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml
これにより、 config.yml ファイルが作成され、NiFiサーバーのホストとポートを手動で追加する必要があります。
Input Ports:
- id: 19442f9d-aead-3569-b94c-1ad397e8291c
name: From MiNiFi
comment: ''
max concurrent tasks: 1
use compression: false
Properties: # Deviates from spec and will later be removed when this is autonegotiated
Port: 1026
Host Name: localhost
これで、このファイルをディレクトリに配置できます
ここでは、データフローを作成し、MiNiFiエージェントで構成するために多くの手作業を行っています。 これは、数百人のエージェントが遠隔地に存在する可能性がある実際のシナリオでは実用的ではありません。 ただし、前に見たように、MiNiFiC2サーバーを使用してこれを自動化できます。 ただし、これはこのチュートリアルの範囲外です。
4.6. データパイプラインのテスト
最後に、データパイプラインをテストする準備が整いました。 実際のセンサーを使用する自由がないため、小さなシミュレーションを作成します。 小さなJavaプログラムを使用してセンサーデータを生成します。
class Sensor implements Callable<Boolean> {
String city;
String station;
String pollutant;
String topic;
Sensor(String city, String station, String pollutant, String topic) {
this.city = city;
this.station = station;
this.pollutant = pollutant;
this.topic = topic;
}
@Override
public Boolean call() throws Exception {
MqttClient publisher = new MqttClient(
"tcp://localhost:1883", UUID.randomUUID().toString());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
IntStream.range(0, 10).forEach(i -> {
String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
pollutant,
city,
station,
ThreadLocalRandom.current().nextDouble(0, 100));
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(0);
message.setRetained(true);
try {
publisher.publish(topic, message);
Thread.sleep(1000);
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
});
return true;
}
}
ここでは、 EclipsePahoJavaクライアントを使用してMQTTブローカーへのメッセージを生成しています。 シミュレーションを作成するために必要な数のセンサーを追加できます。
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
new Simulation.Sensor("london", "central", "co", "air-quality/co"),
new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);
すべてが正常に機能する場合は、InfluxDBデータベースのデータをクエリできます。
たとえば、データベース「airquality」では、測定値「ozone」に属するすべてのポイントを確認できます。
5. 結論
要約すると、このチュートリアルでは基本的なIoTのユースケースについて説明しました。 また、MQTT、NiFi、InfluxDBなどのツールを使用してスケーラブルなデータパイプラインを構築する方法も理解しました。 もちろん、これはIoTアプリケーションの全範囲を網羅しているわけではなく、データ分析のパイプラインを拡張する可能性は無限大です。
さらに、このチュートリアルで選択した例は、デモンストレーションのみを目的としています。 IoTアプリケーションの実際のインフラストラクチャとアーキテクチャは、非常に多様で複雑になる可能性があります。 さらに、実用的な洞察をコマンドとして後方にプッシュすることで、フィードバックサイクルを完了することができます。