1. 概要

このチュートリアルでは、メッセージキューとパブリッシャー/サブスクライバーの使用について説明します。 これらは、2つ以上のサービスが相互に通信するために分散システムで使用される一般的なパターンです。

このチュートリアルでは、すべての例がRabbitMQメッセージブローカーを使用して示されるため、最初にRabbitMQのチュートリアルに従ってローカルで起動して実行します。 RabbitMQの詳細については、他のチュートリアルを確認してください。

注: Kafka Google Cloud Pub-Sub Amazon SQS など、このチュートリアルの同じ例に使用できるRabbitMQの代替手段は多数あります。ほんの少し例を挙げれば。

2. メッセージキューとは何ですか?

メッセージキューを確認することから始めましょう。 メッセージキューは、キューを介して通信するパブリッシングサービスと複数のコンシューマーサービスで構成されます。この通信は通常、パブリッシャーがコンシューマーにコマンドを発行する一方向です。 パブリッシングサービスは通常、メッセージをキューまたは交換に配置し、単一のコンシューマーサービスがこのメッセージを消費し、これに基づいてアクションを実行します。

次の交換を検討してください。

 

これから、メッセージ’m n+1’をキューに入れているPublisherサービスを確認できます。 さらに、消費されるのを待っているキューにすでに存在する複数のメッセージを確認することもできます。 右側には、メッセージのキューをリッスンしている2つの消費サービス「A」と「B」があります。

しばらくしてから同じ交換について考えてみましょう。

 

まず、パブリッシャーのメッセージがキューの最後にプッシュされていることがわかります。 次に、考慮すべき重要な部分は画像の右側です。 コンシューマー「A」がメッセージ「m1」を読み取ったため、他のサービス「B」が消費するキューで使用できなくなっていることがわかります。

2.1. メッセージキューを使用する場所

メッセージキューは、サービスから作業を委任する場合によく使用されます。 そうすることで、作業が1回だけ実行されるようにします。

メッセージキューの使用は、マイクロサービスアーキテクチャで人気があり、クラウドベースまたはサーバーレスアプリケーションの開発中に、負荷に基づいてアプリを水平方向にスケーリングできるためです。

たとえば、処理を待機しているキューに多数のメッセージがある場合、同じメッセージキューをリッスンし、メッセージの流入を処理する複数のコンシューマーサービスを起動できます。 メッセージが処理されると、トラフィックが最小になったときにサービスをオフにして、ランニングコストを節約できます。

2.2. RabbitMQの使用例

わかりやすくするために例を見てみましょう。 この例では、ピザレストランの形を取ります。 人々がアプリを介してピザを注文でき、ピザ屋のシェフが入ってきたときに注文を受け取ると想像してみてください。 この例では、顧客は私たちの発行者であり、シェフは私たちの消費者です。

まず、キューを定義しましょう。

private static final String MESSAGE_QUEUE = "pizza-message-queue";

@Bean
public Queue queue() {
    return new Queue(MESSAGE_QUEUE);
}

Spring AMQPを使用して、「pizza-message-queue」という名前のキューを作成しました。 次に、新しく定義したキューにメッセージを投稿するパブリッシャーを定義しましょう。

public class Publisher {

    private RabbitTemplate rabbitTemplate;
    private String queue;

    public Publisher(RabbitTemplate rabbitTemplate, String queue) {
        this.rabbitTemplate = rabbitTemplate;
        this.queue = queue;
    }

    @PostConstruct
    public void postMessages() {
        rabbitTemplate.convertAndSend(queue, "1 Pepperoni");
        rabbitTemplate.convertAndSend(queue, "3 Margarita");
        rabbitTemplate.convertAndSend(queue, "1 Ham and Pineapple (yuck)");
    }
}

Spring AMQPは、構成のオーバーヘッドを削減するためにRabbitMQ交換に接続する RabbitTemplateBeanを作成します。 私たちのパブリッシャーは、キューに3つのメッセージを送信することでこれを利用します。

ピザの注文が入ったので、別の消費者向けアプリケーションが必要です。 これは、例でシェフとして機能し、メッセージを読みます。

public class Consumer {
    public void receiveOrder(String message) {
        System.out.printf("Order received: %s%n", message);
    }
}

次に、リフレクションを使用してコンシューマーの受信注文メソッドを呼び出すキュー用のMessageListenerAdapterを作成しましょう。

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(MESSAGE_QUEUE);
    container.setMessageListener(listenerAdapter);
    return container;
}

@Bean
public MessageListenerAdapter listenerAdapter(Consumer consumer) {
    return new MessageListenerAdapter(consumer, "receiveOrder");
}

キューから読み取られたメッセージは、ConsumerクラスのreceiveOrderメソッドにルーティングされます。 このアプリケーションを実行するために、受注を処理するために必要な数のコンシューマーアプリケーションを作成できます。 たとえば、400のピザの注文がキューに入れられた場合、複数の消費者の「シェフ」が必要になる可能性があります。そうしないと、注文が遅くなります。 この場合、10個のコンシューマーインスタンスを起動して、タイムリーに注文を処理することができます。

3. Pub-Subとは何ですか?

メッセージキューについて説明したので、pub-subを調べてみましょう。 逆に、メッセージキューの場合、pub-subアーキテクチャでは、すべての消費(サブスクライブ)アプリケーションが、パブリッシャーがエクスチェンジに投稿するメッセージの少なくとも1つのコピーを取得する必要があります。

次の交換を検討してください。

 

左側には、トピックに「m n+1」というメッセージを送信する発行者がいます。 このトピックは、このメッセージをサブスクリプションにブロードキャストします。 これらのサブスクリプションはキューにバインドされています。 各キューには、メッセージを待機しているリスニングサブスクライバサービスがあります。

しばらくしてから同じ交換を考えてみましょう。

 

両方がこのメッセージのコピーを受信したため、両方のサブスクライブサービスが「m1」を消費しています。 さらに、トピックは新しいメッセージ「m n+1」をすべてのサブスクライバーに配布しています。

Pub subは、各サブスクライバーがメッセージのコピーを取得することを保証する必要がある場合に使用する必要があります。

3.1. RabbitMQの使用例

衣料品のウェブサイトがあると想像してみてください。 このウェブサイトは、ユーザーに取引を通知するためのプッシュ通知を送信することができます。 私たちのシステムは、電子メールまたはテキストアラートを介して通知を送信できます。 このシナリオでは、Webサイトが発行者であり、テキストおよび電子メールアラートサービスがサブスクライバーです。

まず、トピック交換を定義し、それに2つのキューをバインドしましょう。

private static final String PUB_SUB_TOPIC = "notification-topic";
private static final String PUB_SUB_EMAIL_QUEUE = "email-queue";
private static final String PUB_SUB_TEXT_QUEUE = "text-queue";

@Bean
public Queue emailQueue() {
    return new Queue(PUB_SUB_EMAIL_QUEUE);
}

@Bean
public Queue textQueue() {
    return new Queue(PUB_SUB_TEXT_QUEUE);
}

@Bean
public TopicExchange exchange() {
    return new TopicExchange(PUB_SUB_TOPIC);
}

@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange exchange) {
    return BindingBuilder.bind(emailQueue).to(exchange).with("notification");
}

@Bean
public Binding textBinding(Queue textQueue, TopicExchange exchange) {
    return BindingBuilder.bind(textQueue).to(exchange).with("notification");
}

これで、ルーティングキー「通知」を使用して2つのキューをバインドしました。これは、このルーティングキーを使用してトピックに投稿されたメッセージが両方のキューに送信されることを意味します。 以前に作成したPublisherクラスを更新すると、いくつかのメッセージを取引所に送信できます。

rabbitTemplate.convertAndSend(topic, "notification", "New Deal on T-Shirts: 95% off!");
rabbitTemplate.convertAndSend(topic, "notification", "2 for 1 on all Jeans!");

4. 比較

両方の領域に触れたので、両方のタイプの交換を簡単に比較してみましょう。

前述のように、メッセージキューとpub-subアーキテクチャパターンはどちらも、アプリケーションを分割して水平方向にスケーラブルにするための優れた方法です。

pub-subキューまたはメッセージキューのいずれかを使用するもう1つの利点は、通信が従来の同期モードの通信よりも耐久性があることです。たとえば、アプリAが非同期HTTP呼び出しを介してアプリBと通信する場合、アプリケーションがダウンすると、データが失われ、リクエストを再試行する必要があります。

コンシューマーアプリケーションインスタンスがダウンした場合にメッセージキューを使用すると、代わりに別のコンシューマーがメッセージを処理できるようになります。 pub-subを使用すると、サブスクライバーがダウンしている場合、サブスクライバーが見逃したメッセージを回復すると、サブスクライブキューで使用できるようになります。

最後に、コンテキストが重要です。 pub-subアーキテクチャとメッセージキューアーキテクチャのどちらを使用するかを選択することは、消費するサービスをどのように動作させるかを正確に定義することになります。 覚えておくべき最も重要な要素は、「すべての消費者がすべてのメッセージを受け取るかどうかは重要ですか?」と尋ねることです。

5. 結論

このチュートリアルでは、pub-subキューとメッセージキュー、およびそれぞれの特徴のいくつかを見てきました。 このチュートリアルで言及されているすべてのコードは、GitHubにあります。