1.はじめに

この記事では、https://projects.spring.io/spring-amqp/[Spring AMQP]およびhttp://www.rabbitmq.com/[RabbitMQ]を使用して、

fanout

の概念とトピックの交換について説明します。

高レベルでは、ファンアウト交換は同じメッセージをすべてのバインドキューにブロードキャストしますが、トピック交換は特定のバインドキューにメッセージを渡すためにルーティングキーを使用します。

この記事では、

Spring AMQPを使用したメッセージング

の事前のお読みをお勧めします。

2.ファンアウト交換の設定

2つのバウンドキューを持つファンアウト交換を1つ設定しましょう。

Spring AMQPを使用すると、コレクション内のJava構成からキュー、交換、およびバインディングのすべての宣言を集約し、

List <Declarable>

として返すことができます。

@Bean
public List<Declarable> fanoutBindings() {
    Queue fanoutQueue1 = new Queue("fanout.queue1", false);
    Queue fanoutQueue2 = new Queue("fanout.queue2", false);
    FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");

    return Arrays.asList(
      fanoutQueue1,
      fanoutQueue2,
      fanoutExchange,
      bind(fanoutQueue1).to(fanoutExchange),
      BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}

この設定では、両方のキューがこの交換に送信されたすべてのメッセージを受け取ることになります。

3.トピック交換の設定

また、それぞれ特定のルーティングキーを持つ2つのバインドキューを使用してトピック交換を設定します。

@Bean
public List<Declarable> topicBindings() {
    Queue topicQueue1 = new Queue(topicQueue1Name, false);
    Queue topicQueue2 = new Queue(topicQueue2Name, false);

    TopicExchange topicExchange = new TopicExchange(topicExchangeName);

    return Arrays.asList(
      topicQueue1,
      topicQueue2,
      topicExchange,
      BindingBuilder
        .bind(topicQueue1)
        .to(topicExchange).with("** .important.** "),
      BindingBuilder
        .bind(topicQueue2)
        .to(topicExchange).with("#.error"));
}

この設定では、

topicQueue1

は、ルーティングキーが3ワードパターンで、中間ワードが「重要」であるすべてのメッセージを取得すると想定します。 .__

2番目のキューは、ルーティングキーが誤って終了しているすべてのメッセージを受け取ります。一致する例は

“ user.important.error”

または__“ blog.post.save.error”です。

4.プロデューサーの設定

設定した交換を使用するためのメッセージプロデューサを作成する必要があります。

BroadcastMessageProducer

クラスは

RabbitTemplate

とそのメソッド

convertAndSend

を使ってメッセージを送信します。

@Component
public class BroadcastMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessages(String message) {
        rabbitTemplate.convertAndSend(
          SpringAmqpConfig.fanoutExchangeName, "", message);
        rabbitTemplate.convertAndSend(
          SpringAmqpConfig.topicExchangeName, "user.not-important.info", message);
        rabbitTemplate.convertAndSend(
          SpringAmqpConfig.topicExchangeName, "user.important.error", message);
    }
}


  • RabbitTemplate

    は、さまざまな交換タイプのために多くのオーバーロードされた

    convertAndSend()

    メソッドを提供します。

ファンアウト交換にメッセージを送信すると、ルーティングキーは無視され、メッセージはすべてのバインドキューに渡されます。

トピック交換にメ​​ッセージを送信するときには、ルーティングキーを渡す必要があります。このルーティングキーに基づいて、メッセージは特定のキューに配信されます。

5.コンシューマの設定

最後に、生成されたメッセージを受け取るために、4つのコンシューマ(各キューに1つ)を設定しましょう。

@Component
public class BroadcastMessageConsumers {

    @RabbitListener(queues = {SpringAmqpConfig.fanoutQueue1Name})
    public void receiveMessageFromFanout1(String message) {
    }

    @RabbitListener(queues = {SpringAmqpConfig.fanoutQueue2Name})
    public void receiveMessageFromFanout2(String message) {
    }

    @RabbitListener(queues = {SpringAmqpConfig.topicQueue1Name})
    public void receiveMessageFromTopic1(String message) {
    }

    @RabbitListener(queues = {SpringAmqpConfig.topicQueue2Name})
    public void receiveMessageFromTopic2(String message) {
    }
}


@ RabbitListener

アノテーションを使ってコンシューマを設定します。ここで渡される唯一の引数はキューの名前です。消費者はここで交換やルーティングキーを意識していません。

6.例を実行する

私たちのサンプルプロジェクトはSpring Bootアプリケーションなので、RabbitMQへの接続とともにアプリケーションを初期化し、すべてのキュー、交換、およびバインディングを設定します。

デフォルトでは、このアプリケーションはポート5672のlocalhostでRabbitMQインスタンスが実行されることを想定しています。デフォルトは

application.yaml

で変更できます。

私たちのプロジェクトは、リクエストボディにメッセージを入れたPOST呼び出しを受け付けるURI「/broadcast」でHTTPエンドポイントを公開します。

ボディ「Test」でこのURIにリクエストを送信すると、ログ出力に次のような内容が表示されるはずです。

2017-04-14 12:27:59.611  INFO 4534 ---[cTaskExecutor-1]c.b.springamqpsimple.MessageConsumers    : Received fanout 1 message:Test
2017-04-14 12:27:59.611  INFO 4534 ---[cTaskExecutor-1]c.b.springamqpsimple.MessageConsumers    : Received topic 2 message: Test
2017-04-14 12:27:59.611  INFO 4534 ---[cTaskExecutor-1]c.b.springamqpsimple.MessageConsumers    : Received fanout 2 message: Test
2017-04-14 12:27:59.611  INFO 4534 ---[cTaskExecutor-1]c.b.springamqpsimple.MessageConsumers    : Received topic 1 message: Test
2017-04-14 12:27:59.612  INFO 4534 ---[cTaskExecutor-1]c.b.springamqpsimple.MessageConsumers    : Received topic 2 message: Test

これらのメッセージが表示される順序は、もちろん保証されていません。

7.まとめ

このクイックチュートリアルでは、Spring AMQPとRabbitMQとのファンアウトとトピックの交換について説明しました。

この記事の完全なソースコードとすべてのコードスニペットは、https://github.com/eugenp/tutorials/tree/master/spring-amqp-simple[GitHub repository]から入手できます。