RabbitMQとPythonのPukaを使用して複数のコンシューマーにメッセージを配信する方法
前提条件
RabbitMQ
RabbitMQを使用してメッセージを送受信するには、ソフトウェア自体をインストールして構成した後でのみ可能です。 RabbitMQのインストールと管理の方法では、RabbitMQを機能させる方法について詳しく説明しており、このメッセージブローカーを使用するための良い出発点です。
PukaPythonライブラリ
この記事のすべての例は、AMQPメッセージングプロトコルを処理するpukaライブラリでバックアップされたPython言語を使用して提示されています。 Pythonは、わかりやすくわかりやすい言語として選択されていますが、AMQPは広く採用されているプロトコルであるため、他のプログラミング言語を自由に使用して同様の目標を達成できます。
pukaを使用してすばやくインストールできます pip
–Pythonパッケージマネージャー。
pip install puka
pipは常にLinuxディストリビューションにバンドルされているわけではありません。 Debianベースのディストリビューション(Ubuntuを含む)では、以下を使用して簡単にインストールできます。
apt-get install python-pip
CentOSのようなRHELベースの場合:
yum install python-setuptools
easy_install pip
RabbitMQとその用語の紹介
メッセージング[RabbitMQは特に]、メッセージブローカーとそのメカニズムの基本原則を説明するいくつかの用語を紹介します。
-
Producer は、がメッセージを送信するパーティであるため、メッセージの作成が生成されます。
-
Consumer は、がメッセージを受信するパーティであるため、メッセージの受信には時間がかかります。
-
Queue は、送信されたメッセージが保存され、受信できる状態になっているバッファです。 1つのキューが保持できるメッセージの数に制限はありません。 また、キューにメッセージを送信できるプロデューサーの数や、キューにアクセスを試みることができるコンシューマーの数にも制限はありません。 メッセージが既存のキューに到達すると、その特定のキューにアクセスするコンシューマーによって消費されるまでそこで待機します。 メッセージが存在しないキューにヒットすると、メッセージは破棄されます。
-
Exchange は、プロデューサーとキューの間に存在するエンティティです。 プロデューサーがメッセージをキューに直接送信することはありません。 メッセージをエクスチェンジに送信し、エクスチェンジは、使用されるエクスチェンジに応じて、メッセージを1つ以上のキューに配置します。 現実の比喩を使用するために、交換は郵便配達員のようなものです。それはメッセージを処理して、消費者がそれらを集めることができる適切なキュー(メールボックス)に配信されるようにします。
-
Binding は、キューと交換の間の接続です。 特定の取引所にバインドされたキューは、取引所によって処理されます。 どの程度正確に交換自体に依存します。
このテキストでは、5つの用語すべてが使用されます。 もう1つ、厳密にpuka pythonライブラリに関連するものがあります。これは、明確にするために選択されたライブラリとして選択されました。 これはpromiseであり、AMQPサーバーへの同期要求として理解され、要求の実行(成功または失敗)を保証し、クライアントは要求が完了するまで待機します。
pukaは非同期で動作できますが、この例では、pukaを同期ライブラリとして使用します。 つまり、各リクエスト(約束)の後、pukaは実行されるまで待機してから次のステップに進みます。
簡単な例でRabbitMQとPukaをテストする
メッセージブローカーとpukaが完全に機能するかどうかをテストし、メッセージの送受信が実際にどのように機能するかを把握するには、次の名前のサンプルpythonスクリプトを作成します。 rabbit_test.py
vim rabbit_test.py
スクリプトの内容を貼り付けます。
import puka
# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")
# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)
# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)
# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)
# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
producer.wait(send_promise)
print "Message sent!"
# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)
print "Starting receiving!"
while True:
received_message = consumer.wait(receive_promise)
print "GOT: %r" % (received_message['body'],)
break
:wq を押してファイルを保存し、終了します。
スクリプトを実行すると、スクリプトによって送信されたメッセージが RabbitMQ キューに出力されます。これは、テストプログラムが直後にメッセージを受信するためです。 出力は次のようになります。
root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'
root@rabbitmq:~#
このコードで何が起こるかを説明するために、ステップバイステップで進みましょう。
-
コンシューマーとプロデューサーの両方が作成され、同じRabbitMQサーバーに接続され、
localhost
-
プロデューサーは、メッセージが生成されるときにキューが存在することを確認するために、キューを宣言します。 このステップがなかった場合、キューが存在しない可能性があるため、メッセージはすぐに破棄される可能性があります。
-
プロデューサーは、事前に作成されたキューを指定するルーティングキーを使用して、メッセージを nameless_exchange (交換については後で詳しく説明します)に送信します。 その後、メッセージは取引所に届き、取引所はそれを「ウサギ」キューに入れます。 メッセージは、誰かがそれを消費するまでそこに留まります。
-
コンシューマーは「ウサギ」キューにアクセスし、そこに保存されているメッセージの受信を開始します。 待機中のメッセージが1つあるため、すぐに配信されます。 消費されます。つまり、キューに残りません。
-
消費されたメッセージが画面に印刷されます。
ファンアウト交換
前の例では、名前のない交換を使用して、「rabbit」という名前の特定のキューにメッセージを配信しました。 名前のない交換が機能するにはキュー名が必要です。つまり、単一のキューにのみメッセージを配信できます。
RabbitMQ には他のタイプの交換もあり、そのうちの1つはファンアウトであり、このテキストの主な関心事です。 ファンアウト交換は、認識しているALLキューにメッセージを配信するシンプルなブラインドツールです。 ファンアウト交換では、特定のキュー名を指定する必要はありません(実際には不可能です)。 その種類の交換にヒットするメッセージは、メッセージが生成される前に、交換にバインドされているすべてのキューに配信されます。 交換に接続できるキューの数に制限はありません。
パブリッシュ/サブスクライブパターン
ファンアウト交換を使用すると、パブリッシュ/サブスクライブパターンを簡単に作成でき、すべてのニュースレターを開くように機能します。 ニュースレターの放送局であるプロデューサーは、知らないかもしれない視聴者に定期的にメッセージを送信します(メッセージを作成し、ニュースレターのファンアウト交換に送信します)。 新しい購読者はニュースレターを申請します(同じニュースレターのファンアウトに自分のキューをバインドします)。 その瞬間から、ニュースレターのファンアウト交換は、登録されているすべてのサブスクライバー(キュー)にメッセージを配信します。
1対1のメッセージングは非常に簡単で、開発者は他の通信手段を使用することがよくありますが、1対多(「多く」は指定されておらず、少数からロットの間であれば何でもかまいません。 ])は、メッセージブローカーが非常に役立つ可能性がある非常に人気のあるシナリオです。
プロデューサーアプリケーションの作成
プロデューサーアプリケーションの唯一の役割は、名前付きのファンアウト交換を作成し、その交換に定期的なメッセージ(数秒ごと)を生成することです。 実際のシナリオでは、メッセージはある理由で生成されます。 例を単純化するために、メッセージは自動生成されます。 このアプリケーションは、ニュースレターの発行者として機能します。
名前の付いたPythonスクリプトを作成します newsletter_produce.py
vim newsletter_produce.py
スクリプトの内容を貼り付けます。
import puka
import datetime
import time
# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)
# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)
# send current time in a loop
while True:
message = "%s" % datetime.datetime.now()
message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
producer.wait(message_promise)
print "SENT: %s" % message
time.sleep(1)
producer.close()
コードで何が起こるかを説明するために、例を段階的に見ていきましょう。
-
プロデューサークライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。 これからは、RabbitMQと自由に通信できるようになります。
-
名前付き
newsletter
ファンアウト交換が作成されます。 そのステップの後、交換はRabbitMQサーバー上に存在し、キューをサーバーにバインドしてメッセージを送信するために使用できます。 -
無限ループでは、現在の時刻のメッセージが
newsletter
両替。 ご了承くださいrouting_key
空です。これは、特定のキューが指定されていないことを意味します。 その後、適切なキューにメッセージを配信するのは交換です。
アプリケーションは、実行時に、すべてのニュースレター購読者に現在の時刻を通知します。
消費者向けアプリケーションの作成
コンシューマーアプリケーションは一時キューを作成し、それを名前付きファンアウトエクスチェンジにバインドします。 その後、メッセージの待機を開始します。 キューを交換にバインドした後、以前に作成されたプロデューサーによって送信されたすべてのメッセージは、このコンシューマーによって受信されるものとします。 このアプリケーションはニュースレターの購読者として機能します。アプリケーションを一度に複数回実行しても、すべてのインスタンスがブロードキャストメッセージを受信します。
名前の付いたPythonスクリプトを作成します newsletter_consume.py
vim newsletter_consume.py
スクリプトの内容を貼り付けます。
import puka
# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)
# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']
# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)
# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)
while True:
message = consumer.wait(message_promise)
print "GOT: %r" % message['body']
consumer.close()
コンシューマーコードは、プロデューサーのコードよりも少し複雑です。 ステップバイステップでそれを調べてみましょう:
-
コンシューマークライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。
-
一時キューが作成されます。 一時的とは、名前が指定されていないことを意味します。キュー名は、RabbitMQによって自動生成されます。 また、このようなキューは、クライアントが切断した後に破棄されます。 これは、交換の1つにバインドするためだけに存在し、他の特別な目的を持たないキューを作成する一般的な方法です。 何かを受け取るにはキューを作成する必要があるので、キュー名を考えないようにするのが便利な方法です。
-
作成されたキューはにバインドされます
newsletter
両替。 その瞬間から、ファンアウト交換はすべてのメッセージをそのキューに配信します。 -
エンドレスループでは、コンシューマーはキューで待機し、キューに到達するすべてのメッセージを受信して画面に出力します。
アプリケーションを実行すると、ニュースレターの発行元から時間通知を受け取ります。 一度に複数回実行することができ、このアプリケーションのすべてのインスタンスが現在の時刻を取得します。
両方のアプリケーションのテスト
ニュースレターの発行元とその利用者をテストするには、仮想サーバーへの複数のSSHセッションを開きます(ローカルコンピューターで作業している場合は、複数のターミナルウィンドウを開きます)。 ウィンドウの1つで、プロデューサーアプリケーションを実行します。
root@rabbitmq:~# python newsletter_produce.py
現在の時刻が毎秒表示され始めます。
SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...
他のすべてのウィンドウで、コンシューマーアプリケーションを実行します。
root@rabbitmq:~# python newsletter_consume.py
このアプリケーションのすべてのインスタンスは、プロデューサーによってブロードキャストされた時間通知を受信します。
GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...
これは、RabbitMQがファンアウト交換を適切に登録し、サブスクライバーキューをこの交換にバインドし、送信されたメッセージを適切なキューに配信したことを意味します。 つまり、RabbitMQは期待どおりに機能しました。
参考文献
パブリッシュ/サブスクライブは、多くの場合便利な単純な(概念と実装の両方の)メッセージングパターンです。 ただし、RabbitMQの制限にはほど遠いです。 高度なメッセージルーティング、メッセージ確認応答、セキュリティ、永続性など、RabbitMQを使用してメッセージングの問題を解決する方法は無数にあります。
このテキストの主な目的は、簡単な例を使用して基本的なメッセージングの概念を紹介することでした。 他の多くの使用法については、公式のRabbitMQドキュメントで詳しく説明されています。これはRabbitMQユーザーと管理者にとって優れたリソースです。