NatsJavaクライアントを使用したメッセージの公開と受信
1. 概要
このチュートリアルでは、NAT用のJavaクライアントを使用してNATSサーバーに接続し、メッセージを送受信します。
NATSは、メッセージ交換の3つの主要なモードを提供します。 パブリッシュ/サブスクライブセマンティクスは、トピックのすべてのサブスクライバーにメッセージを配信します。 リクエスト/リプライメッセージングは、トピックを介してリクエストを送信し、レスポンスをリクエスターにルーティングします。
サブスクライバーは、トピックをサブスクライブするときにメッセージキューグループに参加することもできます。 関連するトピックに送信されたメッセージは、キューグループ内の1人のサブスクライバーにのみ配信されます。
2. 設定
2.1. Mavenの依存関係
まず、NATSライブラリを pom.xml:に追加する必要があります。
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>1.0</version>
</dependency>
ライブラリの最新バージョンはここにあり、Githubプロジェクトはここです。
2.2. NATSサーバー
次に、メッセージを交換するためのNATSサーバーが必要です。 すべての主要なプラットフォームの説明がありますここにあります。
localhost:4222で実行されているサーバーがあると想定します。
3. メッセージの接続と交換
3.1. NATSに接続する
静的NATSクラスのconnect()メソッドは、Connectionsを作成します。
デフォルトのオプションで接続を使用し、ポート4222のローカルホストでリッスンする場合は、デフォルトの方法を使用できます。
Connection natsConnection = Nats.connect();
ただし、接続には多くの構成可能なオプションがあり、そのうちのいくつかをオーバーライドする必要があります。
Options オブジェクトを作成し、それをNatsに渡します。
private Connection initConnection() {
Options options = new Options.Builder()
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();
return Nats.connect(uri, options);
}
NATS接続は永続的です。APIは失われた接続の再接続を試みます。
切断が発生したときと接続が復元されたときに通知するコールバックをインストールしました。 この例ではラムダを使用していますが、単にイベントをログに記録する以上のことを行う必要があるアプリケーションの場合、必要なインターフェイスを実装するオブジェクトをインストールできます。
簡単なテストを実行できます。 接続を作成し、60秒間スリープを追加して、プロセスを実行し続けます。
Connection natsConnection = initConnection();
Thread.sleep(60000);
これを実行します。 次に、NATSサーバーを停止して開始します。
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient
- Channel disconnected: io.nats.client.ConnectionImpl@79428dc1
[reconnect] WARN io.nats.client.ConnectionImpl
- couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient
- Reconnected to server: io.nats.client.ConnectionImpl@79428dc1
コールバックが切断と再接続をログに記録していることがわかります。
3.2. メッセージを購読する
接続が確立されたので、メッセージ処理に取り掛かることができます。
NATS メッセージは、バイト[]の配列のコンテナーです。 予想されるsetData(byte [])および byte [] getData()メソッドに加えて、メッセージの宛先を設定および取得し、トピックに返信するためのメソッドがあります。
文字列であるトピックを購読します。
NATSは、同期サブスクリプションと非同期サブスクリプションの両方をサポートします。
非同期サブスクリプションを見てみましょう。
AsyncSubscription subscription = natsConnection
.subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));
APIは、メッセージをスレッド内の MessageHandler()、に配信します。
一部のアプリケーションでは、代わりにメッセージを処理するスレッドを制御したい場合があります。
SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);
SyncSubscription には、指定されたミリ秒数の間ブロックするブロッキング nextMessage()メソッドがあります。 テストケースを単純にするために、テストには同期サブスクリプションを使用します。
AsyncSubscriptionとSyncSubscriptionの両方に、サブスクリプションを閉じるために使用できる unsubscribe()メソッドがあります。
subscription.unsubscribe();
3.3. メッセージを公開する
メッセージの公開はいくつかの方法で行うことができます。
最も単純な方法では、トピックStringとメッセージbytesのみが必要です。
natsConnection.publish("foo.bar", "Hi there!".getBytes());
サイト運営者が返信を希望する場合、またはメッセージの送信元に関する特定の情報を提供する場合は、返信先トピックを含むメッセージを送信することもできます。
natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());
bytesの代わりにMessageを渡すなど、他のいくつかの組み合わせにもオーバーロードがあります。
3.4. シンプルなメッセージ交換
有効な接続が与えられると、メッセージ交換を検証するテストを作成できます。
SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());
Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
natsConnection
.publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());
message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));
まず、同期サブスクリプションを使用して2つのトピックをサブスクライブします。これらは、JUnitテスト内ではるかにうまく機能するためです。 次に、一方にメッセージを送信し、もう一方をreplyToアドレスとして指定します。
最初の宛先からのメッセージを読んだ後、トピックを「反転」して応答を送信します。
3.5. ワイルドカードサブスクリプション
NATSサーバーはトピックワイルドカードをサポートします。
ワイルドカードは、「。」で区切られたトピックトークンを操作します。 キャラクター。 アスタリスク文字「*」は、個々のトークンと一致します。 大なり記号’>’は、トピックの残りの部分のワイルドカード一致であり、複数のトークンである可能性があります。
例えば:
- foo。*はfoo.bar、foo.requests、と一致しますが、foo.bar.requestsとは一致しません
- foo。>は、foo.bar、foo.requests、foo.bar.requests、foo.bar.baeldungなどに一致します。
いくつかのテストを試してみましょう。
SyncSubscription fooSubscription = client.subscribeSync("foo.*");
client.publishMessage("foo.bar", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);
SyncSubscription barSubscription = client.subscribeSync("foo.>");
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
4. リクエスト/返信メッセージング
私たちのメッセージ交換テストは、pub/subメッセージングシステムの一般的なイディオムに似ていました。 リクエスト/リプライ。 NATSは、この要求/応答メッセージングを明示的にサポートしています。
パブリッシャーは、上記で使用した非同期サブスクリプション方式を使用して、リクエストのハンドラーをインストールできます。
AsyncSubscription subscription = natsConnection
.subscribe("foo.bar.requests", new MessageHandler() {
@Override
public void onMessage(Message msg) {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
}
});
または、到着時にリクエストに応答することもできます。
APIは、 request()メソッドを提供します。
Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);
このメソッドは、応答用の一時的なメールボックスを作成し、返信先アドレスを書き込みます。
Request()は応答を返し、要求がタイムアウトした場合はnullを返します。 最後の引数は、待機するミリ秒数です。
リクエスト/返信用にテストを変更できます。
natsConnection.subscribe(salary.requests", message -> {
natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));
5. メッセージキュー
サブスクライバーは、サブスクリプション時にキューグループを指定できます。 メッセージがグループに公開されると、NATSはそのメッセージを唯一無二のサブスクライバーに配信します。
キューグループはメッセージを永続化しません。使用可能なリスナーがない場合、メッセージは破棄されます。
5.1. キューへのサブスクライブ
サブスクライバーは、キューグループ名を String:として指定します
SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");
もちろん、非同期バージョンもあります。
SyncSubscription subscription = natsConnection
.subscribe("topic", "queue name", new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
サブスクリプションは、NATSサーバー上にキューを作成します。
5.2. キューへの公開
キューグループにメッセージを公開するには、関連するトピックに公開する必要があります。
natsConnection.publish("foo", "queue message".getBytes());
NATSサーバーはメッセージをキューにルーティングし、メッセージ受信者を選択します。
これはテストで確認できます。
SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");
natsConnection.publish("foo", "foobar".getBytes());
List<Message> messages = new ArrayList<>();
Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);
message = queue2.nextMessage(200);
if (message != null) messages.add(message);
assertEquals(1, messages.size());
メッセージは1つだけです。
最初の2行を通常のサブスクリプションに変更すると、次のようになります。
SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");
メッセージが両方のサブスクライバーに配信されるため、テストは失敗します。
6. 結論
この簡単な紹介では、NATSサーバーに接続し、pub/subメッセージと負荷分散キューメッセージの両方を送信しました。 ワイルドカードサブスクリプションのNATSサポートを確認しました。 また、リクエスト/リプライメッセージングも使用しました。
コードサンプルは、いつものように、GitHubのにあります。