SpringAMQPを使用したRabbitMQメッセージディスパッチ
1. 序章
このチュートリアルでは、ファンアウトの概念と、 SpringAMQPおよびRabbitMQとのトピック交換について説明します。
大まかに言えば、ファンアウト交換はすべてのバインドされたキューに同じメッセージをブロードキャストしますが、トピック交換はルーティングキーを使用してメッセージを特定のバインドされた1つまたは複数のキュー。
このチュートリアルでは、 Messaging With SpringAMQPを事前に読むことをお勧めします。
2. ファンアウト交換の設定
2つのキューがバインドされた1つのファンアウト交換を設定しましょう。 この取引所にメッセージを送信すると、両方のキューがメッセージを受信します。 ファンアウト交換では、メッセージに含まれているルーティングキーはすべて無視されます。
Spring AMQPを使用すると、キュー、交換、およびバインディングのすべての宣言をDeclarablesオブジェクトに集約できます。
@Bean
public Declarables fanoutBindings() {
Queue fanoutQueue1 = new Queue("fanout.queue1", false);
Queue fanoutQueue2 = new Queue("fanout.queue2", false);
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
return new Declarables(
fanoutQueue1,
fanoutQueue2,
fanoutExchange,
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}
3. トピック交換の設定
次に、それぞれ異なるバインディングパターンを持つ2つのキューを使用してトピック交換を設定します。
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue(topicQueue1Name, false);
Queue topicQueue2 = new Queue(topicQueue2Name, false);
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}
トピック交換により、異なるキーパターンでキューをバインドできます。これは非常に柔軟性があり、同じパターンの複数のキュー、または同じキューに複数のパターンをバインドできます。
メッセージのルーティングキーがパターンと一致すると、メッセージはキューに入れられます。 キューにメッセージのルーティングキーと一致する複数のバインディングがある場合、メッセージの1つのコピーのみがキューに配置されます。
バインディングパターンでは、アスタリスク( “*”)を使用して特定の位置の単語に一致させるか、ポンド記号( “#”)を使用して0個以上の単語に一致させることができます。
したがって、 topicQueue1 は、中間ワードが「重要」である3ワードパターンのルーティングキーを持つメッセージを受信します。たとえば、「user.important.error」または[ X192X]「blog.important.notification」。
また、 topicQueue2 は、エラーという単語で終わるルーティングキーを持つメッセージを受信します。 一致する例は、「error」、「user.important.error」、または「blog.post.save.error」です。
4. プロデューサーの設定
RabbitTemplateのconvertAndSendメソッドを使用して、サンプルメッセージを送信します。
String message = " payload is broadcast";
return args -> {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN,
"topic important warn" + message);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR,
"topic important error" + message);
};
RabbitTemplate は、さまざまな交換タイプに対して、多くのオーバーロードされた convertAndSend()メソッドを提供します。
ファンアウト交換にメッセージを送信すると、ルーティングキーは無視され、メッセージはすべてのバインドされたキューに渡されます。
トピック交換にメッセージを送信するときは、ルーティングキーを渡す必要があります。 このルーティングキーに基づいて、メッセージは特定のキューに配信されます。
5. コンシューマーの構成
最後に、生成されたメッセージを取得するために、4つのコンシューマー(キューごとに1つ)を設定しましょう。
@RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
public void receiveMessageFromFanout1(String message) {
System.out.println("Received fanout 1 message: " + message);
}
@RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
public void receiveMessageFromFanout2(String message) {
System.out.println("Received fanout 2 message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
public void receiveMessageFromTopic1(String message) {
System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
}
@RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
public void receiveMessageFromTopic2(String message) {
System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
}
@RabbitListenerアノテーションを使用してコンシューマーを構成します。ここで渡される引数はキューの名前のみです。 消費者はここで交換やルーティングキーを認識していません。
6. 例の実行
サンプルプロジェクトはSpringBootアプリケーションであるため、RabbitMQへの接続とともにアプリケーションを初期化し、すべてのキュー、交換、およびバインディングをセットアップします。
デフォルトでは、アプリケーションはポート5672のローカルホストで実行されているRabbitMQインスタンスを想定しています。 これと他のデフォルトはapplication.yamlで変更できます。
私たちのプロジェクトは、リクエスト本文にメッセージを含むPOSTを受け入れるURI – / broadcast –でHTTPエンドポイントを公開します。
本文が「Test」のこのURIにリクエストを送信すると、出力に次のようなものが表示されます。
Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast
もちろん、これらのメッセージが表示される順序は保証されていません。
7. 結論
このクイックチュートリアルでは、SpringAMQPおよびRabbitMQとのファンアウトおよびトピック交換について説明しました。
このチュートリアルの完全なソースコードとすべてのコードスニペットは、GitHubリポジトリで入手できます。