入門


前提条件


このテキストは、 RabbitMQとPythonのPukaを使用して複数のコンシューマーにメッセージを配信する方法の続きであり、同じソフトウェアバンドルを正しく実行する必要があります。 また、同じ定義が記事全体で使用されており、読者は前のテキストの主題に精通していることを前提としています。

取引所


すでに説明しました fanout 交換。追加のルールを設定せずに、その交換にバインドされたすべてのキューにメッセージを配信します。 これは非常に便利なメカニズムですが、柔軟性に欠けています。 多くの場合、プロデューサーが取引所に発行するすべてのものを受け取ることは望ましくありません。 RabbitMQ は、より複雑なシナリオを実装するために使用できる2つの異なる交換タイプを提供します。 これらの1つは direct 両替。

直接交換


序章


直接交換は、RabbitMQでシンプルなキーベースのルーティングメカニズムを提供します。 これは、最初の例で使用された名前のない交換にいくぶん似ています。この例では、メッセージのルーティングキーに等しい名前のキューにメッセージが配信されました。 ただし、名前のない交換では、明示的なキューバインディングを定義する必要はありませんでしたが、直接交換では、バインディングは重要で必須です。

直接交換を使用する場合、その交換に対して生成される各メッセージには、任意の名前文字列であるルーティングキーを指定する必要があります。 テキサス。 次に、メッセージは、同じルーティングキーを使用してこの交換にバインドされているすべてのキューに配信されます( Texas ルーティングキーを使用してメッセージに関心があると明示的に宣言されたすべてのキュー)。

基本的な無名交換との最大の違い direct 交換とは、後者にはバインディングが必要であり、その前にその交換でメッセージをリッスンするキューはないということです。 その結果、3つの大きな利点が得られます。

  1. 1つのキューをバインドして、同じ交換で多くの異なるルーティングキーをリッスンできます

  2. 1つのキューをバインドして、多くのの異なる交換を一度にリッスンできます

  3. 多くのキューをバインドして、交換で同じルーティングキーをリッスンできます

大都市のハブを想像してみましょう。鉄道とバスの駅が1つになっていて、両方の交通手段で多くの目的地にアクセスできます。 そして、ステーションがRabbitMQを使用して出発通知をディスパッチしたいとします。 タスクは、シアトルトゥーイル、またはボストン行きのバスまたは電車がまもなく出発することを関心のあるすべての人に知らせることです。

そのようなプログラムは直接を定義します departures 関心のあるすべての顧客がキューをサブスクライブできる交換。 次に、出発時刻を含むメッセージが、宛先を含むルーティングキーを使用してその交換に生成されます。 例えば:

  1. へのメッセージ departures ルーティングキーとの交換 Tooele と体 2014-01-03 15:23

  2. へのメッセージ departures ルーティングキーとの交換 Boston と体 2014-01-03 15:41

  3. へのメッセージ departures ルーティングキーとの交換 Seattle と体 2014-01-03 15:55

1つのキューが一度に多くのルーティングキーにバインドされる可能性があり、多くのキューが同じキーにバインドされる可能性があるため、次のように簡単に設定できます。

  1. Tooeleのみに関心のある1人の顧客

  2. ボストンのみに関心のある1人の顧客

  3. TooeleBostonに同時に興味を持っている別の顧客

すべてが同時に情報を待っています。 彼らは私たちの直接交換を使用して適切なメッセージを受信します。

プロデューサー


この例のタスクを少し単純化するために、1つのコマンドラインパラメーターを受け入れる基本的な通知ディスパッチャーを作成しましょう。 宛先を指定し、アプリケーションは関心のあるすべての消費者に現在の時刻を送信します。

名前の付いたサンプルPythonスクリプトを作成します direct_notify.py

vim direct_notify.py

スクリプトの内容を貼り付けます。

import puka
import datetime
import time
import sys

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)

# create a direct exchange named departures
exchange_promise = producer.exchange_declare(exchange='departures', type='direct')
producer.wait(exchange_promise)

# send current time to destination specified with command line argument
message = "%s" % datetime.datetime.now()

message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message)
producer.wait(message_promise)

print "Departure to %s at %s" % (sys.argv[1], message)
    
producer.close()

:wq を押してファイルを保存し、終了します。

1つのパラメーターを使用してスクリプトを実行すると、現在の時刻と使用されている宛先が出力されます。 出力は次のようになります。

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#

スクリプトを段階的に見ていきましょう。

  1. Producer クライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。 これからは、RabbitMQと自由に通信できるようになります。

  2. 名前付き departures 直接交換が作成されます。 その取引所に公開されたメッセージには異なるキーを割り当てることができるため、作成時にルーティングキーを指定する必要はありません。 そのステップの後、交換はRabbitMQサーバー上に存在し、キューをサーバーにバインドしてメッセージを送信するために使用できます。

  3. コマンドラインパラメータをルーティングキーとして使用して、現在の時刻を含むメッセージがその取引所に公開されます。 サンプル実行では、Tooeleがパラメータとして使用されているため、出発地(ルーティングキー)として使用されています。

注:簡単にするために、スクリプトは必須のコマンドライン引数が指定されているかどうかをチェックしません。 パラメータなしで実行すると正しく動作しません。

消費者


この例の消費者向けアプリケーションは、駅から到達可能な1つ以上の目的地に関心のある公共交通機関の顧客として機能します。

名前の付いたサンプルPythonスクリプトを作成します direct_watch.py

vim direct_watch.py

スクリプトの内容を貼り付けます。

import puka
import sys

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to all routing keys specified by command line arguments
for destination in sys.argv[1:]:
    print "Watching departure times for %s" % destination
    bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination)
    consumer.wait(bind_promise)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "Departure for %s at %s" % (message['routing_key'], message['body'])
    
consumer.close()

:wq を押してファイルを保存し、終了します。

1つのパラメータTooeleを使用してスクリプトを実行すると、スクリプトが Tooele の出発時刻を監視していることを通知する必要がありますが、複数のパラメータを使用してスクリプトを実行すると、多くの宛先の出発時刻を監視していることが通知されます。

root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#

スクリプトの機能を説明するために、スクリプトを段階的に見ていきましょう。

  1. Consumer クライアントが作成され、ローカルのRabbitMQインスタンスに接続されます。 これからは、RabbitMQと自由に通信できるようになります。

  2. この特定のコンシューマーの一時キューが作成され、RabbitMQによって自動生成された名前が付けられます。 スクリプトが終了すると、キューは破棄されます。

  3. キューはすべてにバインドされています departures コマンドラインパラメータを使用して指定されたすべてのルーティングキー(宛先)を交換し、各宛先の情報を画面に印刷します。

  4. スクリプトは、キューでメッセージの待機を開始します。 バインドされたルーティングキーに一致するすべてのメッセージを受信する必要があります。 Tooele を単一のパラメーターとして実行する場合(TooeleBostonの両方を使用する場合のみ)-両方で。 各出発時刻は画面に印刷されます。

テスト


両方のスクリプトが期待どおりに機能するかどうかを確認するには、サーバーに対して3つのターミナルウィンドウを開きます。 1つは、通知を送信するための公共交通機関として使用されます。 別の2つは、出発を待つ顧客として機能します。

最初のターミナルで、 direct_notify.py 任意のパラメータを使用して1回スクリプトを作成します。

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~#

重要: direct_notify.py スクリプトは、キューをバインドする前に交換を作成する必要があるため、コンシューマーの前に少なくとも1回実行する必要があります。 実行後、交換はRabbitMQサーバーに残り、自由に使用できます。

2番目のターミナルで、 direct_watch.py 1つのパラメータを持つスクリプト-Tooele

root@rabbitmq:~# python direct_watch.py Tooele
Watching departure times for Tooele
(...)
root@rabbitmq:~#

3番目のターミナルで、 direct_watch.py TooeleBostonの2つのパラメーターを持つスクリプト:

root@rabbitmq:~# python direct_watch.py Tooele Boston
Watching departure times for Tooele
Watching departure times for Boston
(...)
root@rabbitmq:~#

次に、最初のターミナルに戻り、3つの出発通知を送信します。 1つはTooele、1つは Boston 、もう1つはChicagoです。

root@rabbitmq:~# python direct_notify.py Tooele
Departure to Tooele at 2014-02-18 15:57:29.035000
root@rabbitmq:~# python direct_notify.py Boston
Departure to Tooele at 2014-02-18 15:57:31.035000
root@rabbitmq:~# python direct_notify.py Chicago
Departure to Tooele at 2014-02-18 15:57:35.035000
root@rabbitmq:~#

最初の通知は、トゥーイルへの出発を待っている両方の消費者のみが受信する必要があります。 2つ目は、ボストンへの出発を待っている消費者だけに届くようにする必要があります。 3つ目は、シカゴへの出発を待つ消費者がいないため、これらの消費者が受け取るべきではありません。

これは予想される動作です。 これらの簡単な例は、ルーティングキーで指定された特定のコンシューマーのみが受信するメッセージをディスパッチする方法を示しています。

参考文献


直接ルーティングでは、メッセージの配信先を完全に制御することはできませんが、 fanout 以前の交換で使用された交換で、どこにでもメッセージを盲目的に配信します。 と direct 多くの実際のメッセージングシナリオを交換することができ、プロセスはそれほど難しくありません。

このテキストの主な目的は、単純な現実世界の状況を使用した基本的な直接ルーティングを紹介することでした。 他の多くの使用法については、公式のRabbitMQドキュメントで詳しく説明されています。これはRabbitMQユーザーと管理者にとって優れたリソースです。