1. 序章

このチュートリアルでは、ファンアウトの概念と、 SpringAMQPおよびRabbitMQとのトピック交換について説明します。

大まかに言えば、ファンアウト交換すべてのバインドされたキューに同じメッセージをブロードキャストしますが、トピック交換はルーティングキーを使用してメッセージを特定のバインドされた1つまたは複数のキュー

このチュートリアルでは、 Messaging With SpringAMQPを事前に読むことをお勧めします。

2. ファンアウト交換の設定

2つのキューがバインドされた1つのファンアウト交換を設定しましょう。 この取引所にメッセージを送信すると、両方のキューがメッセージを受信します。 ファンアウト交換では、メッセージに含まれているルーティングキーはすべて無視されます。

Spring AMQPを使用すると、キュー、交換、およびバインディングのすべての宣言をDeclarablesオブジェクトに集約できます。

@Bean
public Declarables fanoutBindings() {
    Queue fanoutQueue1 = new Queue("fanout.queue1", false);
    Queue fanoutQueue2 = new Queue("fanout.queue2", false);
    FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");

    return new Declarables(
      fanoutQueue1,
      fanoutQueue2,
      fanoutExchange,
      BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
      BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}

3. トピック交換の設定

次に、それぞれ異なるバインディングパターンを持つ2つのキューを使用してトピック交換を設定します。

@Bean
public Declarables topicBindings() {
    Queue topicQueue1 = new Queue(topicQueue1Name, false);
    Queue topicQueue2 = new Queue(topicQueue2Name, false);

    TopicExchange topicExchange = new TopicExchange(topicExchangeName);

    return new Declarables(
      topicQueue1,
      topicQueue2,
      topicExchange,
      BindingBuilder
        .bind(topicQueue1)
        .to(topicExchange).with("*.important.*"),
      BindingBuilder
        .bind(topicQueue2)
        .to(topicExchange).with("#.error"));
}

トピック交換により、異なるキーパターンでキューをバインドできます。これは非常に柔軟性があり、同じパターンの複数のキュー、または同じキューに複数のパターンをバインドできます。

メッセージのルーティングキーがパターンと一致すると、メッセージはキューに入れられます。 キューにメッセージのルーティングキーと一致する複数のバインディングがある場合、メッセージの1つのコピーのみがキューに配置されます。

バインディングパターンでは、アスタリスク( “*”)を使用して特定の位置の単語に一致させるか、ポンド記号( “#”)を使用して0個以上の単語に一致させることができます。

したがって、 topicQueue1 は、中間ワードが「重要」である3ワードパターンのルーティングキーを持つメッセージを受信します。たとえば、「user.important.error」または[ X192X]「blog.important.notification」。

また、 topicQueue2 は、エラーという単語で終わるルーティングキーを持つメッセージを受信します。 一致する例は、「error」「user.important.error」、または「blog.post.save.error」です。

4. プロデューサーの設定

RabbitTemplateconvertAndSendメソッドを使用して、サンプルメッセージを送信します。

    String message = " payload is broadcast";
    return args -> {
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, 
            "topic important warn" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, 
            "topic important error" + message);
    };

RabbitTemplate は、さまざまな交換タイプに対して、多くのオーバーロードされた convertAndSend()メソッドを提供します。

ファンアウト交換にメッセージを送信すると、ルーティングキーは無視され、メッセージはすべてのバインドされたキューに渡されます。

トピック交換にメッセージを送信するときは、ルーティングキーを渡す必要があります。 このルーティングキーに基づいて、メッセージは特定のキューに配信されます。

5. コンシューマーの構成

最後に、生成されたメッセージを取得するために、4つのコンシューマー(キューごとに1つ)を設定しましょう。

    @RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
    public void receiveMessageFromFanout1(String message) {
        System.out.println("Received fanout 1 message: " + message);
    }

    @RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
    public void receiveMessageFromFanout2(String message) {
        System.out.println("Received fanout 2 message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
    public void receiveMessageFromTopic1(String message) {
        System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
    public void receiveMessageFromTopic2(String message) {
        System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
    }

@RabbitListenerアノテーションを使用してコンシューマーを構成します。ここで渡される引数はキューの名前のみです。 消費者はここで交換やルーティングキーを認識していません。

6. 例の実行

サンプルプロジェクトはSpringBootアプリケーションであるため、RabbitMQへの接続とともにアプリケーションを初期化し、すべてのキュー、交換、およびバインディングをセットアップします。

デフォルトでは、アプリケーションはポート5672のローカルホストで実行されているRabbitMQインスタンスを想定しています。 これと他のデフォルトはapplication.yamlで変更できます。

私たちのプロジェクトは、リクエスト本文にメッセージを含むPOSTを受け入れるURI – / broadcast –でHTTPエンドポイントを公開します。

本文が「Test」のこのURIにリクエストを送信すると、出力に次のようなものが表示されます。

Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast

もちろん、これらのメッセージが表示される順序は保証されていません。

7. 結論

このクイックチュートリアルでは、SpringAMQPおよびRabbitMQとのファンアウトおよびトピック交換について説明しました。

このチュートリアルの完全なソースコードとすべてのコードスニペットは、GitHubリポジトリで入手できます。