リアクティブアプリケーションでのSpringAMQP
1. 概要
このチュートリアルでは、AMQPメッセージング標準の一般的な実装であるRabbitMQメッセージングサーバーと統合する単純なSpringBootReactiveアプリケーションを作成する方法を示します。
両方のパターンの違いを強調する分散セットアップを使用して、ポイントツーポイントシナリオとパブリッシュ/サブスクライブシナリオの両方をカバーします。
AMQP、RabbitMQ、Spring Bootの基本的な知識、特にExchange、Queues、Topicsなどの重要な概念を前提としていることに注意してください。 これらの概念の詳細については、以下のリンクを参照してください。
2. RabbitMQサーバーのセットアップ
ローカルのRabbitMQをローカルにセットアップすることもできますが、実際には、高可用性、監視、セキュリティなどの追加機能を備えた専用のインストールを使用する可能性が高くなります。
開発マシンでこのような環境をシミュレートするために、Dockerを使用してアプリケーションが使用するサーバーを作成します。
次のコマンドは、スタンドアロンのRabbitMQサーバーを起動します。
$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3
永続ボリュームは宣言されていないため、再起動の間に未読メッセージは失われます。 このサービスは、ホストのポート5672で利用できます。
docker logs コマンドを使用してサーバーログを確認できます。これにより、次のような出力が生成されます。
$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
Starting RabbitMQ 3.7.5 on Erlang 20.3.5
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL. See http://www.rabbitmq.com/
## ##
## ## RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: <stdout>
Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
node : rabbit@rabbit
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : CY9rzUYh03PK3k6DJie09g==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@rabbit
// ... more log lines
イメージにはrabbitmqctlユーティリティが含まれているため、実行中のイメージのコンテキストで管理タスクを実行するために使用できます。
たとえば、次のコマンドを使用してサーバーのステータス情報を取得できます。
$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
{running_applications,
[{rabbit,"RabbitMQ","3.7.5"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.5"},
// ... other info omitted for brevity
その他の便利なコマンドは次のとおりです。
- list_exchanges :宣言されたすべてのExchangeを一覧表示します
- list_queues :未読メッセージの数を含む、宣言されたすべてのキューを一覧表示します
- list_bindings :すべてのリストは、ルーティングキーも含め、交換とキュー間のバインディングを定義します
3. SpringAMQPプロジェクトのセットアップ
RabbitMQサーバーを稼働させたら、Springプロジェクトの作成に進むことができます。 このサンプルプロジェクトでは、RESTクライアントがSpringAMQPモジュールと対応するSpringBootスターターを使用してメッセージングサーバーと通信することにより、メッセージングサーバーにメッセージを投稿および/または受信できるようになります。
pom.xmlプロジェクトファイルに追加する必要がある主な依存関係は次のとおりです。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
spring-boot-starter-amqp は、AMQP関連のものをすべて提供しますが、 spring-boot-starter-webflux は、リアクティブRESTサーバーの実装に使用されるコア依存関係です。
注:MavenCentralでSpringBoot Starter AMQPおよびWebfluxモジュールの最新バージョンを確認できます。
4. シナリオ1:ポイントツーポイントメッセージング
この最初のシナリオでは、クライアントからメッセージを受信するブローカー内の論理エンティティである直接交換を使用します。
Direct Exchangeは、すべての着信メッセージを1つだけのキューにルーティングし、そこからクライアントが使用できるようにします。 複数のクライアントが同じキューにサブスクライブできますが、特定のメッセージを受信するのは1つだけです。
4.1. 交換とキューの設定
このシナリオでは、交換名とルーティングキーをカプセル化するDestinationInfoオブジェクトを使用します。 目的地名でキー設定された地図は、利用可能なすべての目的地を保存するために使用されます。
次の@PostConstructメソッドは、この初期設定を担当します。
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private DestinationsConfig destinationsConfig;
@PostConstruct
public void setupQueueDestinations() {
destinationsConfig.getQueues()
.forEach((key, destination) -> {
Exchange ex = ExchangeBuilder.directExchange(
destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder.durable(
destination.getRoutingKey())
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
});
}
このメソッドは、Springによって作成された adminAmqp beanを使用して、Exchange、キューを宣言し、指定されたルーティングキーを使用してそれらをバインドします。
すべての宛先は、 DestinationsConfig Beanから取得されます。これは、この例で使用されている@ConfigurationPropertiesクラスです。
このクラスには、application.yml構成ファイルから読み取られたマッピングから構築されたDestinationInfoオブジェクトが設定されたプロパティがあります。
4.2. プロデューサーエンドポイント
プロデューサーは、 HTTPPOSTを/queue /{name}の場所に送信してメッセージを送信します。
これはリアクティブエンドポイントであるため、Monoを使用して単純な確認応答を返します。
@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
// ... other members omitted
@Autowired
private AmqpTemplate amqpTemplate;
@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(
@PathVariable String name, @RequestBody String payload) {
DestinationInfo d = destinationsConfig
.getQueues().get(name);
if (d == null) {
return Mono.just(
ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(
d.getExchange(),
d.getRoutingKey(),
payload);
return ResponseEntity.accepted().build();
});
}
最初に、nameパラメーターが有効な宛先に対応するかどうかを確認し、対応する場合は、自動配線された amqpTemplate インスタンスを使用して、ペイロード(単純な String メッセージ)をRabbitMQに実際に送信します。
4.3. MessageListenerContainerファクトリ
非同期でメッセージを受信するために、SpringAMQPはMessageContainerListener 抽象クラスを使用して、アプリケーションによって提供されるAMQPキューおよびリスナーからの情報フローを仲介します。
メッセージリスナーをアタッチするには、このクラスの具体的な実装が必要なので、コントローラーコードを実際の実装から分離するファクトリを定義します。
この場合、ファクトリメソッドは、 createMessageListenerContainer メソッドを呼び出すたびに、新しいSimpleMessageContainerListenerを返します。
@Component
public class MessageListenerContainerFactory {
@Autowired
private ConnectionFactory connectionFactory;
public MessageListenerContainerFactory() {}
public MessageListenerContainer createMessageListenerContainer(String queueName) {
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
mlc.addQueueNames(queueName);
return mlc;
}
}
4.4. コンシューマーエンドポイント
コンシューマーは、プロデューサーが使用するのと同じエンドポイントアドレス( / queue / {name} )にアクセスしてメッセージを取得します。
このエンドポイントは、イベントの Flux を返します。ここで、各イベントは受信したメッセージに対応します。
@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;
@GetMapping(
value = "/queue/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
DestinationInfo d = destinationsConfig
.getQueues()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
MessageListenerContainer mlc = messageListenerContainerFactory
.createMessageListenerContainer(d.getRoutingKey());
Flux<String> f = Flux.<String> create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> {
mlc.start();
});
emitter.onDispose(() -> {
mlc.stop();
});
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}
宛先名の最初のチェック後、コンシューマーエンドポイントはMessageListenerContainerFactoryを使用してMessageListenerContainerを作成し、キュー名をレジストリから復元します。
MessageListenerContainer を取得したら、 create()ビルダーメソッドの1つを使用してメッセージFluxを作成します。
特定のケースでは、 FluxSink 引数をとるラムダを使用するものを使用します。これを使用して、SpringAMQPのリスナーベースの非同期APIをリアクティブアプリケーションにブリッジします。
また、エミッターの onRequest()および onDispose()コールバックに2つの追加のラムダをアタッチして、MessageListenerContainerが[ X210X]フラックスのライフサイクル。
最後に、結果の Flux をinterval()で作成された別のとマージします。これにより、5秒ごとに新しいイベントが作成されます。 これらのダミーメッセージは、この場合に重要な機能を果たします:これらがないと、メッセージを受信して送信に失敗したときにのみクライアントの切断を検出します。これは、特定のユースケースによっては時間がかかる場合があります。 。
4.5. テスト
コンシューマーエンドポイントとパブリッシャーエンドポイントの両方をセットアップすると、サンプルアプリケーションでいくつかのテストを実行できるようになります。
application.yml で、RabbitMQのサーバー接続の詳細と少なくとも1つの宛先を定義する必要があります。これは次のようになります。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE
spring.rabbitmq。*プロパティは、ローカルDockerコンテナで実行されているRabbitMQサーバーに接続するために必要な基本的なプロパティを定義します。 上記のIPは単なる例であり、特定の設定では異なる場合があることに注意してください。
キューはを使用して定義されます destinations.queues。
コマンドラインまたはIDEからサーバーを起動すると、メッセージの送受信を開始できます。 を使用しますカールユーティリティ、Windows、Mac、LinuxOSの両方で利用できる共通のユーティリティ。
次のリストは、宛先にメッセージを送信する方法と、サーバーから予想される応答を示しています。
$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact
このコマンドを実行した後、メッセージがRabbitMQによって受信され、次のコマンドを発行して使用できる状態になっていることを確認できます。
$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE 1
これで、次のコマンドを使用してcurlを使用してメッセージを読み取ることができます。
$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message
data:No news is good news...
... same message repeating every 5 secs
ご覧のとおり、最初に以前に保存されたメッセージを取得し、次に5秒ごとにダミーメッセージの受信を開始します。
キューを一覧表示するコマンドを再度実行すると、メッセージが保存されていないことがわかります。
$ docker execrabbitmqrabbitmqctllist_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE 0
5. シナリオ2:パブリッシュ/サブスクライブ
メッセージングアプリケーションのもう1つの一般的なシナリオは、単一のメッセージを複数のコンシューマーに送信する必要があるパブリッシュ/サブスクライブパターンです。
RabbitMQは、これらの種類のアプリケーションをサポートする2種類の交換を提供します。ファンアウトとトピックです。
これら2種類の主な違いは、後者ではルーティングキーパターンに基づいて受信するメッセージをフィルタリングできることです(例: 「alarm.mailserver。*」)は登録時に提供されますが、前者は着信メッセージをすべてのバインドされたキューに複製するだけです。
RabbitMQは、より複雑なメッセージフィルタリングを可能にするヘッダー交換もサポートしていますが、その使用はこの記事の範囲外です。
5.1. 目的地の設定
ポイントツーポイントのシナリオで行ったように、起動時に別の@PostConstructメソッドを使用してPub/Subの宛先を定義します。
唯一の違いは、 Exchanges のみを作成し、キューは作成しないことです。これらはオンデマンドで作成され、後でExchangeにバインドされます。各クライアント専用のキュー:
@PostConstruct
public void setupTopicDestinations(
destinationsConfig.getTopics()
.forEach((key, destination) -> {
Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
});
}
5.2. パブリッシャーエンドポイント
クライアントは、接続されているすべてのクライアントに送信されるメッセージを投稿するために、 / topic /{name}の場所で利用可能なパブリッシャーエンドポイントを使用します。
前のシナリオと同様に、 @PostMapping を使用して、メッセージの送信後のステータスでMonoを返します。
@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
@PathVariable String name, @RequestBody String payload) {
DestinationInfo d = destinationsConfig
.getTopics()
.get(name);
if (d == null) {
return Mono.just(ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
amqpTemplate.convertAndSend(
d.getExchange(), d.getRoutingKey(),payload);
return ResponseEntity.accepted().build();
});
}
5.3. サブスクライバーエンドポイント
サブスクライバーエンドポイントは/topic / {name} に配置され、接続されたクライアントに対してFluxのメッセージを生成します。
これらのメッセージには、受信メッセージと5秒ごとに生成されるダミーメッセージの両方が含まれます。
@GetMapping(
value = "/topic/{name}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getTopics()
.get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound()
.build());
}
Queue topicQueue = createTopicQueue(d);
String qname = topicQueue.getName();
MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
Flux<String> f = Flux.<String> create(emitter -> {
mlc.setupMessageListener((MessageListener) m -> {
String payload = new String(m.getBody());
emitter.next(payload);
});
emitter.onRequest(v -> {
mlc.start();
});
emitter.onDispose(() -> {
amqpAdmin.deleteQueue(qname);
mlc.stop();
});
});
return Flux.interval(Duration.ofSeconds(5))
.map(v -> "No news is good news")
.mergeWith(f);
}
このコードは基本的に前のケースで見たものと同じですが、次の違いがあります。最初に、新しいサブスクライバーごとに新しいQueueを作成します。
これを行うには、 createTopicQueue()メソッドを呼び出します。このメソッドは、 DestinationInfo インスタンスからの情報を使用して、排他的で耐久性のないキューを作成し、それを[X198X ]設定されたルーティングキーを使用したExchange:
private Queue createTopicQueue(DestinationInfo destination) {
Exchange ex = ExchangeBuilder
.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder
.nonDurable()
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
return q;
}
Exchange を再度宣言しているにもかかわらず、起動時にすでに宣言しているため、RabbitMQは新しいものを作成しないことに注意してください。
2番目の違いは、 onDispose()メソッドに渡すラムダにあります。このメソッドは、サブスクライバーが切断したときにQueueも削除します。
5.3. テスト
Pub-Subシナリオをテストするには、最初に次のようにout application.ymlでトピックの宛先を定義する必要があります。
destinations:
## ... queue destinations omitted
topics:
weather:
exchange: alerts
routing-key: WEATHER
ここでは、 / topic /Weatherの場所で利用できるトピックエンドポイントを定義しました。 このエンドポイントは、「WEATHER」ルーティングキーを使用してRabbitMQの「アラート」交換にメッセージを投稿するために使用されます。
サーバーを起動した後、rabitmqctlコマンドを使用して交換が作成されたことを確認できます。
$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic topic
amq.fanout fanout
amq.match headers
amq.headers headers
direct
amq.rabbitmq.trace topic
amq.direct direct
alerts topic
ここで、 list_bindings コマンドを発行すると、「アラート」交換に関連するキューがないことがわかります。
$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange NYSE queue NYSE []
nyse exchange NYSE queue NYSE []
2つのコマンドシェルを開き、それぞれに次のコマンドを発行して、宛先にサブスクライブするいくつかのサブスクライバーを開始しましょう。
$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...
# ... same message repeating indefinitely
最後に、curlをもう一度使用して、サブスクライバーにアラートを送信します。
$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
* Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact
メッセージを送信すると、「ハリケーンが近づいています!」というメッセージがほぼ瞬時に表示されます。 各サブスクライバーのシェルで。
使用可能なバインディングを確認すると、サブスクライバーごとに1つのキューがあることがわかります。
$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange IBOV queue IBOV []
exchange NYSE queue NYSE []
exchange spring.gen-i0m0pbyKQMqpz2_KFZCd0g
queue spring.gen-i0m0pbyKQMqpz2_KFZCd0g []
exchange spring.gen-wCHALTsIS1q11PQbARJ7eQ
queue spring.gen-wCHALTsIS1q11PQbARJ7eQ []
alerts exchange spring.gen-i0m0pbyKQMqpz2_KFZCd0g
queue WEATHER []
alerts exchange spring.gen-wCHALTsIS1q11PQbARJ7eQ
queue WEATHER []
ibov exchange IBOV queue IBOV []
nyse exchange NYSE queue NYSE []
quotes exchange NYSE queue NYSE []
サブスクライバーのシェルでCtrl-Cを押すと、ゲートウェイは最終的にクライアントが切断されたことを検出し、それらのバインディングを削除します。
6. 結論
この記事では、spring-amqpモジュールを使用してRabbitMQサーバーと対話する単純なリアクティブアプリケーションを作成する方法を示しました。
ほんの数行のコードで、ポイントツーポイントとパブリッシュ/サブスクライブの両方の統合パターンをサポートする機能的なHTTP-to-AMQPゲートウェイを作成できました。これを簡単に拡張して、セキュリティなどの機能を追加できます。標準のSpring機能の追加。
この記事に示されているコードは、Githubでから入手できます。