1. 概要

ソフトウェアコンポーネントの分離は、ソフトウェア設計の最も重要な部分の1つです。 これを実現する1つの方法は、コンポーネント(サービス)間の非同期通信方法を提供するメッセージングシステムを使用することです。 この記事では、そのようなシステムの1つであるRabbitMQについて説明します。

RabbitMQは、Advanced Message Queuing Protocol( AMQP )を実装するメッセージブローカーです。 主要なプログラミング言語用のクライアントライブラリを提供します。

ソフトウェアコンポーネントのデカップリングに使用する以外に、RabbitMQは次の目的で使用できます。

  • バックグラウンド操作の実行
  • 非同期操作の実行

2. メッセージングモデル

まず、メッセージングがどのように機能するかを簡単に高レベルで見てみましょう。

簡単に言えば、メッセージングシステムと相互作用するアプリケーションには、プロデューサーとコンシューマーの2種類があります。 プロデューサーとは、ブローカーにメッセージを送信(公開)するプロデューサーと、ブローカーからメッセージを受信するコンシューマーです。 通常、このプログラム(ソフトウェアコンポーネント)は異なるマシンで実行され、RabbitMQはそれらの間の通信ミドルウェアとして機能します。

この記事では、RabbitMQを使用して通信する2つのサービスを使用した簡単な例について説明します。 サービスの1つはRabbitMQにメッセージを公開し、もう1つは消費します。

3. 設定

最初に、公式セットアップガイドここを使用してRabbitMQを実行しましょう。

当然、RabbitMQサーバーとの対話にはJavaクライアントを使用します。 このクライアントのMaven依存関係は次のとおりです。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.0</version>
</dependency>

公式ガイドを使用してRabbitMQブローカーを実行した後、Javaクライアントを使用してそれに接続する必要があります。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

ConnectionFactory を使用してサーバーとの接続をセットアップし、プロトコル(AMQP)と認証も処理します。 ここでは、 localhost でサーバーに接続し、setHost関数を使用してホスト名を変更できます。

デフォルトのポートがRabbitMQサーバーで使用されていない場合は、setPortを使用してポートを設定できます。 RabbitMQのデフォルトポートは15672です。

factory.setPort(15678);

ユーザー名とパスワードを設定できます。

factory.setUsername("user1");
factory.setPassword("MyPassword");

さらに、この接続を使用してメッセージを公開および消費します。

4. プロデューサー

WebアプリケーションでユーザーがWebサイトに新製品を追加できる簡単なシナリオを考えてみましょう。 新製品が追加されたときはいつでも、お客様にメールを送信する必要があります。

まず、キューを定義しましょう。

channel.queueDeclare("products_queue", false, false, false, null);

ユーザーが新製品を追加するたびに、キューにメッセージを公開します。

String message = "product details"; 
channel.basicPublish("", "products_queue", null, message.getBytes());

最後に、チャネルと接続を閉じます。

channel.close();
connection.close();

このメッセージは、顧客への電子メールの送信を担当する別のサービスによって消費されます。

5. 消費者

消費者側に何を実装できるか見てみましょう。 同じキューを宣言します。

channel.queueDeclare("products_queue", false, false, false, null);

キューからのメッセージを非同期的に処理するコンシューマーを定義する方法は次のとおりです。

DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(
        String consumerTag,
        Envelope envelope, 
        AMQP.BasicProperties properties, 
        byte[] body) throws IOException {
 
            String message = new String(body, "UTF-8");
            // process the message
     }
};
channel.basicConsume("products_queue", true, consumer);

6. 結論

この簡単な記事では、RabbitMQの基本的な概念を取り上げ、それを使用した簡単な例について説明しました。

このチュートリアルの完全な実装は、GitHubプロジェクトにあります。