1. 概要

このチュートリアルでは、AMQPメッセージング標準の一般的な実装であるRabbitMQメッセージングサーバーと統合する単純なSpringBootReactiveアプリケーションを作成する方法を示します。

両方のパターンの違いを強調する分散セットアップを使用して、ポイントツーポイントシナリオとパブリッシュ/サブスクライブシナリオの両方をカバーします。

AMQP、RabbitMQ、Spring Bootの基本的な知識、特にExchange、Queues、Topicsなどの重要な概念を前提としていることに注意してください。 これらの概念の詳細については、以下のリンクを参照してください。

2. RabbitMQサーバーのセットアップ

ローカルのRabbitMQをローカルにセットアップすることもできますが、実際には、高可用性、監視、セキュリティなどの追加機能を備えた専用のインストールを使用する可能性が高くなります。

開発マシンでこのような環境をシミュレートするために、Dockerを使用してアプリケーションが使用するサーバーを作成します。

次のコマンドは、スタンドアロンのRabbitMQサーバーを起動します。

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

永続ボリュームは宣言されていないため、再起動の間に未読メッセージは失われます。 このサービスは、ホストのポート5672で利用できます。

docker logs コマンドを使用してサーバーログを確認できます。これにより、次のような出力が生成されます。

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : rabbit@rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

イメージにはrabbitmqctlユーティリティが含まれているため、実行中のイメージのコンテキストで管理タスクを実行するために使用できます。

たとえば、次のコマンドを使用してサーバーのステータス情報を取得できます。

$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

その他の便利なコマンドは次のとおりです。

  • list_exchanges :宣言されたすべてのExchangeを一覧表示します
  • list_queues :未読メッセージの数を含む、宣言されたすべてのキューを一覧表示します
  • list_bindings :すべてのリストは、ルーティングキーも含め、交換とキュー間のバインディングを定義します

3. SpringAMQPプロジェクトのセットアップ

RabbitMQサーバーを稼働させたら、Springプロジェクトの作成に進むことができます。 このサンプルプロジェクトでは、RESTクライアントがSpringAMQPモジュールと対応するSpringBootスターターを使用してメッセージングサーバーと通信することにより、メッセージングサーバーにメッセージを投稿および/または受信できるようになります。

pom.xmlプロジェクトファイルに追加する必要がある主な依存関係は次のとおりです。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.0.2.RELEASE</version> 
</dependency>

spring-boot-starter-amqp は、AMQP関連のものをすべて提供しますが、 spring-boot-starter-webflux は、リアクティブRESTサーバーの実装に使用されるコア依存関係です。

注:MavenCentralでSpringBoot Starter AMQPおよびWebfluxモジュールの最新バージョンを確認できます。

4. シナリオ1:ポイントツーポイントメッセージング

この最初のシナリオでは、クライアントからメッセージを受信するブローカー内の論理エンティティである直接交換を使用します。

Direct Exchangeは、すべての着信メッセージを1つだけのキューにルーティングし、そこからクライアントが使用できるようにします。 複数のクライアントが同じキューにサブスクライブできますが、特定のメッセージを受信するのは1つだけです。

4.1. 交換とキューの設定

このシナリオでは、交換名とルーティングキーをカプセル化するDestinationInfoオブジェクトを使用します。 目的地名でキー設定された地図は、利用可能なすべての目的地を保存するために使用されます。

次の@PostConstructメソッドは、この初期設定を担当します。

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

このメソッドは、Springによって作成された adminAmqp beanを使用して、Exchange、キューを宣言し、指定されたルーティングキーを使用してそれらをバインドします。

すべての宛先は、 DestinationsConfig Beanから取得されます。これは、この例で使用されている@ConfigurationPropertiesクラスです。

このクラスには、application.yml構成ファイルから読み取られたマッピングから構築されたDestinationInfoオブジェクトが設定されたプロパティがあります。

4.2. プロデューサーエンドポイント

プロデューサーは、 HTTPPOST/queue /{name}の場所に送信してメッセージを送信します。

これはリアクティブエンドポイントであるため、Monoを使用して単純な確認応答を返します。

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

最初に、nameパラメーターが有効な宛先に対応するかどうかを確認し、対応する場合は、自動配線された amqpTemplate インスタンスを使用して、ペイロード(単純な String メッセージ)をRabbitMQに実際に送信します。

4.3. MessageListenerContainerファクトリ

非同期でメッセージを受信するために、SpringAMQPはMessageContainerListener 抽象クラスを使用して、アプリケーションによって提供されるAMQPキューおよびリスナーからの情報フローを仲介します。

メッセージリスナーをアタッチするには、このクラスの具体的な実装が必要なので、コントローラーコードを実際の実装から分離するファクトリを定義します。

この場合、ファクトリメソッドは、 createMessageListenerContainer メソッドを呼び出すたびに、新しいSimpleMessageContainerListenerを返します。

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

4.4. コンシューマーエンドポイント

コンシューマーは、プロデューサーが使用するのと同じエンドポイントアドレス( / queue / {name} )にアクセスしてメッセージを取得します。

このエンドポイントは、イベントの Flux を返します。ここで、各イベントは受信したメッセージに対応します。

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

宛先名の最初のチェック後、コンシューマーエンドポイントはMessageListenerContainerFactoryを使用してMessageListenerContainerを作成し、キュー名をレジストリから復元します。

MessageListenerContainer を取得したら、 create()ビルダーメソッドの1つを使用してメッセージFluxを作成します。

特定のケースでは、 FluxSink 引数をとるラムダを使用するものを使用します。これを使用して、SpringAMQPのリスナーベースの非同期APIをリアクティブアプリケーションにブリッジします。

また、エミッターの onRequest()および onDispose()コールバックに2つの追加のラムダをアタッチして、MessageListenerContainerが[ X210X]フラックスのライフサイクル。

最後に、結果の Flux interval()で作成された別のとマージします。これにより、5秒ごとに新しいイベントが作成されます。 これらのダミーメッセージは、この場合に重要な機能を果たします:これらがないと、メッセージを受信して送信に失敗したときにのみクライアントの切断を検出します。これは、特定のユースケースによっては時間がかかる場合があります。 。

4.5. テスト

コンシューマーエンドポイントとパブリッシャーエンドポイントの両方をセットアップすると、サンプルアプリケーションでいくつかのテストを実行できるようになります。

application.yml で、RabbitMQのサーバー接続の詳細と少なくとも1つの宛先を定義する必要があります。これは次のようになります。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

spring.rabbitmq。*プロパティは、ローカルDockerコンテナで実行されているRabbitMQサーバーに接続するために必要な基本的なプロパティを定義します。 上記のIPは単なる例であり、特定の設定では異なる場合があることに注意してください。

キューはを使用して定義されます destinations.queues。 。* 、 どこ宛先名として使用されます。 ここでは、「NYSE」ルーティングキーを使用してRabbitMQの「nyse」取引所にメッセージを送信する「NYSE」という名前の単一の宛先を宣言しました。

コマンドラインまたはIDEからサーバーを起動すると、メッセージの送受信を開始できます。 を使用しますカールユーティリティ、Windows、Mac、LinuxOSの両方で利用できる共通のユーティリティ。

次のリストは、宛先にメッセージを送信する方法と、サーバーから予想される応答を示しています。

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

このコマンドを実行した後、メッセージがRabbitMQによって受信され、次のコマンドを発行して使用できる状態になっていることを確認できます。

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

これで、次のコマンドを使用してcurlを使用してメッセージを読み取ることができます。

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

ご覧のとおり、最初に以前に保存されたメッセージを取得し、次に5秒ごとにダミーメッセージの受信を開始します。

キューを一覧表示するコマンドを再度実行すると、メッセージが保存されていないことがわかります。

$ docker execrabbitmqrabbitmqctllist_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. シナリオ2:パブリッシュ/サブスクライブ

メッセージングアプリケーションのもう1つの一般的なシナリオは、単一のメッセージを複数のコンシューマーに送信する必要があるパブリッシュ/サブスクライブパターンです。

RabbitMQは、これらの種類のアプリケーションをサポートする2種類の交換を提供します。ファンアウトとトピックです。

これら2種類の主な違いは、後者ではルーティングキーパターンに基づいて受信するメッセージをフィルタリングできることです(例: 「alarm.mailserver。*」)は登録時に提供されますが、前者は着信メッセージをすべてのバインドされたキューに複製するだけです。

RabbitMQは、より複雑なメッセージフィルタリングを可能にするヘッダー交換もサポートしていますが、その使用はこの記事の範囲外です。

5.1. 目的地の設定

ポイントツーポイントのシナリオで行ったように、起動時に別の@PostConstructメソッドを使用してPub/Subの宛先を定義します。

唯一の違いは、 Exchanges のみを作成し、キューは作成しないことです。これらはオンデマンドで作成され、後でExchangeにバインドされます。各クライアント専用のキュー

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

5.2. パブリッシャーエンドポイント

クライアントは、接続されているすべてのクライアントに送信されるメッセージを投稿するために、 / topic /{name}の場所で利用可能なパブリッシャーエンドポイントを使用します。

前のシナリオと同様に、 @PostMapping を使用して、メッセージの送信後のステータスでMonoを返します。

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

5.3. サブスクライバーエンドポイント

サブスクライバーエンドポイントは/topic / {name} に配置され、接続されたクライアントに対してFluxのメッセージを生成します。

これらのメッセージには、受信メッセージと5秒ごとに生成されるダミーメッセージの両方が含まれます。

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

このコードは基本的に前のケースで見たものと同じですが、次の違いがあります。最初に、新しいサブスクライバーごとに新しいQueueを作成します。

これを行うには、 createTopicQueue()メソッドを呼び出します。このメソッドは、 DestinationInfo インスタンスからの情報を使用して、排他的で耐久性のないキューを作成し、それを[X198X ]設定されたルーティングキーを使用したExchange:

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

Exchange を再度宣言しているにもかかわらず、起動時にすでに宣言しているため、RabbitMQは新しいものを作成しないことに注意してください。

2番目の違いは、 onDispose()メソッドに渡すラムダにあります。このメソッドは、サブスクライバーが切断したときにQueueも削除します。

5.3. テスト

Pub-Subシナリオをテストするには、最初に次のようにout application.ymlでトピックの宛先を定義する必要があります。

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

ここでは、 / topic /Weatherの場所で利用できるトピックエンドポイントを定義しました。 このエンドポイントは、「WEATHER」ルーティングキーを使用してRabbitMQの「アラート」交換にメッセージを投稿するために使用されます。

サーバーを起動した後、rabitmqctlコマンドを使用して交換が作成されたことを確認できます。

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

ここで、 list_bindings コマンドを発行すると、「アラート」交換に関連するキューがないことがわかります。

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

2つのコマンドシェルを開き、それぞれに次のコマンドを発行して、宛先にサブスクライブするいくつかのサブスクライバーを開始しましょう。

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

最後に、curlをもう一度使用して、サブスクライバーにアラートを送信します。

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

メッセージを送信すると、「ハリケーンが近づいています!」というメッセージがほぼ瞬時に表示されます。 各サブスクライバーのシェルで。

使用可能なバインディングを確認すると、サブスクライバーごとに1つのキューがあることがわかります。

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

サブスクライバーのシェルでCtrl-Cを押すと、ゲートウェイは最終的にクライアントが切断されたことを検出し、それらのバインディングを削除します。

6. 結論

この記事では、spring-amqpモジュールを使用してRabbitMQサーバーと対話する単純なリアクティブアプリケーションを作成する方法を示しました。

ほんの数行のコードで、ポイントツーポイントとパブリッシュ/サブスクライブの両方の統合パターンをサポートする機能的なHTTP-to-AMQPゲートウェイを作成できました。これを簡単に拡張して、セキュリティなどの機能を追加できます。標準のSpring機能の追加。

この記事に示されているコードは、Githubでから入手できます。