1. 概要

このチュートリアルでは、EclipsePahoプロジェクトが提供するライブラリを使用してJavaプロジェクトにMQTTメッセージングを追加する方法を説明します。

2. MQTT入門書

MQTT(MQ Telemetry Transport)は、産業用アプリケーションで使用されるような低電力デバイスとの間でデータを転送するためのシンプルで軽量な方法の必要性に対処するために作成されたメッセージングプロトコルです。

IoT(Internet of Things)デバイスの人気が高まるにつれ、MQTTの使用が増え、OASISとISOによる標準化につながりました。

このプロトコルは、単一のメッセージングパターン、つまりパブリッシュ/サブスクライブパターンをサポートします。クライアントによって送信される各メッセージには、サブスクライブされたクライアントにルーティングするためにブローカーによって使用される関連する「トピック」が含まれます。 トピック名は、「oiltemp」のような単純な文字列またはパスのような文字列「motor/ 1 /rpm」にすることができます。

メッセージを受信するために、クライアントは、正確な名前またはサポートされているワイルドカードの1つを含む文字列(マルチレベルトピックの場合は「#」、シングルレベルの場合は「+」)を使用して1つ以上のトピックをサブスクライブします。

3. プロジェクトの設定

MavenプロジェクトにPahoライブラリを含めるには、次の依存関係を追加する必要があります。

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.0</version>
</dependency>

Eclipse PahoJavaライブラリモジュールの最新バージョンはMavenCentralからダウンロードできます。

4. クライアントのセットアップ

Pahoライブラリを使用する場合、MQTTブローカーからメッセージを送受信するために最初に行う必要があるのは、 IMqttClientインターフェースの実装を取得します 。 このインターフェースには、サーバーへの接続を確立し、メッセージを送受信するためにアプリケーションが必要とするすべてのメソッドが含まれています。

Pahoは、このインターフェイスの2つの実装、非同期1つ( MqttAsyncClient )と同期1つ( MqttClient )を備えています。 この場合、より単純なセマンティクスを持つ同期バージョンに焦点を当てます。

セットアップ自体は2段階のプロセスです。最初にMqttClientクラスのインスタンスを作成し、次にそれをサーバーに接続します。 次のサブセクションでは、これらの手順について詳しく説明します。

4.1. 新しいIMqttClientインスタンスの作成

次のコードスニペットは、新しいIMqttClient同期インスタンスを作成する方法を示しています。

String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

この場合、使用可能な最も単純なコンストラクターを使用しています。これは、MQTTブローカーのエンドポイントアドレスと、クライアントを一意に識別するクライアント識別子を取得します。

この例では、ランダムなUUIDを使用したため、実行のたびに新しいクライアント識別子が生成されます。

Pahoは、未確認メッセージの保存に使用される永続化メカニズムや、プロトコルエンジンの実装に必要なバックグラウンドタスクの実行に使用されるScheduledExecutorServiceをカスタマイズするために使用できる追加のコンストラクターも提供します。

使用しているサーバーエンドポイントは、PahoプロジェクトによってホストされているパブリックMQTTブローカーです。これにより、インターネットに接続している人なら誰でも、認証を必要とせずにクライアントをテストできます。

4.2. サーバーへの接続

新しく作成したMqttClientインスタンスがサーバーに接続されていません。 これを行うには、connect()メソッドを呼び出し、オプションで MqttConnectOptions インスタンスを渡して、プロトコルのいくつかの側面をカスタマイズできるようにします。

特に、これらのオプションを使用して、セキュリティクレデンシャル、セッションリカバリモード、再接続モードなどの追加情報を渡すことができます。

MqttConnectionOptions クラスは、これらのオプションを、通常のセッターメソッドを使用して設定できる単純なプロパティとして公開します。 シナリオに必要なプロパティを設定するだけで済みます。残りのプロパティはデフォルト値を想定しています。

サーバーへの接続を確立するために使用されるコードは、通常、次のようになります。

MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);

ここでは、接続オプションを次のように定義します。

  • ネットワーク障害が発生した場合、ライブラリは自動的にサーバーへの再接続を試みます
  • 前回の実行で送信されなかったメッセージは破棄されます
  • 接続タイムアウトは10秒に設定されています

5. メッセージの送信

すでに接続されているMqttClientを使用してメッセージを送信するのは非常に簡単です。 次のサービス品質オプションのいずれかを使用して、publish()メソッドバリアントの1つを使用して、ペイロード(常にバイト配列)を特定のトピックに送信します。

  • 0 –「せいぜい1回」のセマンティクス。「ファイアアンドフォーゲット」とも呼ばれます。 メッセージの損失が許容できる場合は、このオプションを使用します。これは、いかなる種類の確認応答や永続性も必要としないためです。
  • 1 –「少なくとも1回」のセマンティクス。 メッセージの損失が許容できないおよびサブスクライバーが重複を処理できる場合は、このオプションを使用します
  • 2 –「正確に1回」のセマンティクス。 メッセージの損失が許容できないおよびサブスクライバーが重複を処理できない場合は、このオプションを使用します

サンプルプロジェクトでは、 EngineTemperatureSensor クラスは、 call()メソッドを呼び出すたびに新しい温度測定値を生成する模擬センサーの役割を果たします。

このクラスはCallableインターフェースを実装しているため、java.util.concurrentパッケージで利用可能なExecutorService実装の1つで簡単に使用できます。

public class EngineTemperatureSensor implements Callable<Void> {

    // ... private members omitted
    
    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {        
        if ( !client.isConnected()) {
            return null;
        }           
        MqttMessage msg = readEngineTemp();
        msg.setQos(0);
        msg.setRetained(true);
        client.publish(TOPIC,msg);        
        return null;        
    }

    private MqttMessage readEngineTemp() {             
        double temp =  80 + rnd.nextDouble() * 20.0;        
        byte[] payload = String.format("T:%04.2f",temp)
          .getBytes();        
        return new MqttMessage(payload);           
    }
}

MqttMessageは、ペイロード自体、要求されたサービス品質、およびメッセージの保持フラグもカプセル化します。このフラグは、サブスクライバーによって消費されるまでこのメッセージを保持する必要があることをブローカーに示します。

この機能を使用して「最後の既知の正常」動作を実装できるため、新しいサブスクライバーがサーバーに接続すると、保持されているメッセージをすぐに受信します。

6. メッセージの受信

MQTTブローカーからメッセージを受信するには、 subscribe()メソッドバリアントの1つを使用する必要があります。これにより、次のように指定できます。

  • 受信したいメッセージの1つ以上のトピックフィルター
  • 関連するQoS
  • 受信したメッセージを処理するためのコールバックハンドラ

次の例では、既存の IMqttClient インスタンスにメッセージリスナーを追加して、特定のトピックからメッセージを受信する方法を示します。  CountDownLatch をコールバックとメイン実行スレッド間の同期メカニズムとして使用し、新しいメッセージが到着するたびにデクリメントします。

サンプルコードでは、メッセージを受信するために別のIMqttClientインスタンスを使用しました。 どのクライアントが何を実行するかをより明確にするために行いましたが、これはPahoの制限ではありません。必要に応じて、メッセージの公開と受信に同じクライアントを使用できます。

CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
    byte[] payload = msg.getPayload();
    // ... payload handling omitted
    receivedSignal.countDown();
});    
receivedSignal.await(1, TimeUnit.MINUTES);

上記で使用されているsubscribe()バリアントは、2番目の引数としてIMqttMessageListenerインスタンスを取ります。

この例では、ペイロードを処理してカウンターをデクリメントする単純なラムダ関数を使用します。 指定された時間枠(1分)に十分なメッセージが到着しない場合、 await()メソッドは例外をスローします。

Pahoを使用する場合、メッセージの受信を明示的に確認する必要はありません。 コールバックが正常に戻った場合、Pahoはそれが正常に消費されたと見なし、サーバーに確認応答を送信します。

コールバックがExceptionをスローすると、クライアントはシャットダウンされます。 これにより、QoSレベル0で送信されたメッセージが失われることに注意してください。

QoSレベル1または2で送信されたメッセージは、クライアントが再接続されてトピックに再度サブスクライブすると、サーバーによって再送信されます。

7. 結論

この記事では、Eclipse Pahoプロジェクトが提供するライブラリを使用して、JavaアプリケーションでMQTTプロトコルのサポートを追加する方法を示しました。

このライブラリは、すべての低レベルプロトコルの詳細を処理し、メッセージの永続性など、内部機能の重要な側面をカスタマイズするための十分なスペースを残しながら、ソリューションの他の側面に集中できるようにします。

この記事に示されているコードは、GitHubから入手できます。