1概要

このチュートリアルでは、https://github.com/nats-io/java-nats[Java Client for NAT]を使用してhttps://nats.io/documentation/[NATS Server]に接続して公開します。メッセージを受信します。

NATSは3つの主要なメッセージ交換モードを提供します。 Publish/Subscribeセマンティクスは、トピックのすべての購読者にメッセージを配信します。要求/応答メッセージングは​​トピックを介して要求を送信し、応答を要求者に送り返します。

購読者は、トピックを購読するときにメッセージキューグループに参加することもできます。関連トピックに送信されたメッセージは、キューグループ内の1人のサブスクライバーにのみ配信されます。


2セットアップ


2.1. Mavenの依存関係

まず、NATSライブラリを__pom.xmlに追加する必要があります。

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>1.0</version>
</dependency>

ライブラリhttps://search.maven.org/classic/#search%7Cga%7C1%7Ca%3A%22jnats%22[の最新バージョンはここにあります]、そしてGithubプロジェクトはhttps://githubです。 com/nats-io/java-nats[ここ]。


2.2. NATSサーバー

次に、メッセージをやり取りするためにNATSサーバーが必要です。

here.html

すべての主要なプラットフォームについての説明があります。

localhost:4222でサーバーが実行されているとします。


3接続メッセージと交換メッセージ


3.1. NATSに接続する

  • 静的NATSクラスの

    connect()

    メソッドは

    Connections

    を作成します。

デフォルトのオプションで接続を使用し、ポート4222のlocalhostで待機したい場合は、デフォルトの方法を使用できます。

Connection natsConnection = Nats.connect();

しかし

Connections

には多くの設定可能なオプションがあり、そのうちのいくつかを上書きしたいと思います。


Options

オブジェクトを作成して

Nats

に渡します。

private Connection initConnection() {
    Options options = new Options.Builder()
      .errorCb(ex -> log.error("Connection Exception: ", ex))
      .disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
      .reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
      .build();

    return Nats.connect(uri, options);
}

  • NATS

    Connections

    は耐久性があります** APIは失われた接続を再接続しようとします。

切断が発生したときと接続が回復したときを通知するためのコールバックをインストールしました。この例では、ラムダを使用していますが、単にイベントをログに記録する以上のことを行う必要があるアプリケーションには、必要なインターフェイスを実装するオブジェクトをインストールできます。

簡単なテストを実行できます。接続を作成し、プロセスを実行し続けるために60秒間スリープさせます。

Connection natsConnection = initConnection();
Thread.sleep(60000);

これを実行してください。その後、NATSサーバーを停止して起動します。

----[jnats-callbacks]ERROR com.baeldung.nats.NatsClient
  - Channel disconnected:[email protected][reconnect]WARN io.nats.client.ConnectionImpl
  - couldn't connect to nats://localhost:4222 (nats: connection read error)[jnats-callbacks]ERROR com.baeldung.nats.NatsClient
  - Reconnected to server:[email protected]----

コールバックが切断をログに記録して再接続するのを確認できます。


3.2. メッセージを購読する

接続ができたので、メッセージ処理に取り掛かります。

NATS

Message

は、

bytes[]

の配列のコンテナです。予想される

setData(byte[])

メソッドと

byte[]getData()

メソッドに加えて、メッセージの送信先を設定および取得し、トピックに返信するためのメソッドがあります。


Strings.

というトピックを購読しています。

NATSは同期と非同期の両方のサブスクリプションをサポートしています。

非同期サブスクリプションを見てみましょう。

AsyncSubscription subscription = natsConnection
  .subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));

APIはスレッド内で

Messages

を__MessageHandler()に渡します。

アプリケーションによっては、代わりにメッセージを処理するスレッドを制御したい場合があります。

SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);


SyncSubscription

には、指定されたミリ秒数の間ブロックする

nextMessage()

ブロッキングメソッドがあります。テストケースを単純にするために、テストには同期サブスクリプションを使用します。


AsyncSubscription



SyncSubscription

には、どちらも購読を閉じるために使用できる

unsubscribe()

メソッドがあります。

subscription.unsubscribe();


3.3. メッセージを公開


Messages

の公開はいくつかの方法で行うことができます。

最も簡単な方法は、トピック

String

とメッセージ

bytes

のみを必要とします。

natsConnection.publish("foo.bar", "Hi there!".getBytes());

パブリッシャーが返信を希望する場合、またはメッセージの送信元に関する特定の情報を提供する場合は、返信トピックを含むメッセージを送信することもできます。

natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());


bytes

ではなく

Message

を渡すなど、他のいくつかの組み合わせでもオーバーロードがあります。


3.4. 簡単なメッセージ交換

有効な

Connection

が与えられたら、メッセージ交換を検証するテストを書くことができます。

SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());

Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

natsConnection
  .publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());

message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));

同期サブスクリプションを使用して2つのトピックをサブスクライブすることから始めます。なぜなら、それらはJUnitテストの中でよりうまく機能するからです。次に、一方を

replyTo

アドレスとして指定して、一方にメッセージを送信します。

最初の宛先からメッセージを読んだ後、トピックを「反転」して応答を送信します。


3.5. ワイルドカードサブスクリプション

NATSサーバーはトピックのワイルドカードをサポートします。

ワイルドカードは、「。」文字で区切られたトピックトークンを操作します。アスタリスク文字「** 」は個々のトークンと一致します。大なり記号「>」は、トピックの残りの部分に対するワイルドカードの一致です。これは、複数のトークンである場合があります。

例えば:

  • foo。

    はfoo.bar、foo.requestsに一致しますが、

    foo.bar.requests


    には一致しません

  • foo。> foo.bar、foo.requests、foo.bar.requestsと一致します。

foo.bar.baeldungなど

いくつかテストしてみましょう。

SyncSubscription fooSubscription = client.subscribeSync("foo.** ");

client.publishMessage("foo.bar", "bar.foo", "hello there");

Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);

SyncSubscription barSubscription = client.subscribeSync("foo.>");

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");

message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));


4メッセージングの要求/返信

私たちのメッセージ交換テストは、pub/subメッセージングシステムにおける一般的な慣用句に似ていました。リクエスト/返信

NATSはこのリクエスト/リプライメッセージング

を明示的にサポートしています。

パブリッシャーは、上記で使用した非同期サブスクリプション方式を使用して、リクエスト用のハンドラーをインストールできます。

AsyncSubscription subscription = natsConnection
  .subscribe("foo.bar.requests", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        natsConnection.publish(message.getReplyTo(), reply.getBytes());
    }
});

または、到着したときにリクエストに応答することもできます。

APIは

request()

メソッドを提供します。

Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);

このメソッドは、応答用の一時メールボックスを作成し、返信先アドレスを書き込みます。


Request()

はレスポンスを返します。リクエストがタイムアウトした場合は

null

を返します。

最後の引数は待機するミリ秒数です。

リクエスト/リプライ用にテストを変更することができます。

natsConnection.subscribe(salary.requests", message -> {
    natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));


5メッセージキュー

加入者は、加入時にキューグループを指定できます。

メッセージがグループに発行されると、NATSはそれを唯一の加入者に配信します

  • キューグループはメッセージを永続化しません** 利用可能なリスナーがいない場合、メッセージは破棄されます。


5.1. キューへの登録

サブスクライバは、キューグループ名を__Stringとして指定します。

SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");

もちろん非同期版もあります。

SyncSubscription subscription = natsConnection
  .subscribe("topic", "queue name", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        log.info("Received message on {}", msg.getSubject());
    }
});

サブスクリプションはNATSサーバー上にキューを作成します。

[[publish queues]]


5.2. キューへの公開

メッセージをキューグループにパブリッシュするには、単に関連トピックにパブリッシュする必要があります。

natsConnection.publish("foo",  "queue message".getBytes());

NATSサーバーはメッセージをキューにルーティングし、メッセージ受信者を選択します。

テストでこれを確認できます。

SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");

natsConnection.publish("foo", "foobar".getBytes());

List<Message> messages = new ArrayList<>();

Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);

message = queue2.nextMessage(200);
if (message != null) messages.add(message);

assertEquals(1, messages.size());

メッセージは1つだけです。

最初の2行を通常の購読に変更すると、

SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");

メッセージが両方のユーザに配信されるため、テストは失敗します。


6. 結論

この簡単な紹介では、NATSサーバーに接続し、pub/subメッセージと負荷分散キューメッセージの両方を送信しました。ワイルドカードサブスクリプションに対するNATSのサポートを調べました。また、要求/返信メッセージングも使用しました。

いつものように、コードサンプルはhttps://github.com/eugenp/tutorials/tree/master/libraries[over GitHub]にあります。