序章


ZeroMQを説明するために選択できる方法はたくさんあります。 それにもかかわらず、それは実際の状態のままです。豊富で成熟した機能セットで開発者に大きな利益をもたらす、真に注目に値するコミュニケーションライブラリです。

DigitalOcean ZeroMQの記事のこの2回目の記事では、アプリケーションのインストールに関する前回の記事に続いて、その使用法を詳しく調べ、この高速で強力なライブラリを実際に実装する方法を発見します。 プロセス間の単純なメッセージングから始めて、連続するセクションに分割されたさまざまな例を見ていきます(つまり、 要求/応答パターンを使用)。

注:この記事は、このテーマに関する2番目の記事です。 あなたがそれについてもっと学ぶことに興味があるなら(すなわち それが何であるか、そしてそれが完全なメッセージブローカーとどのように比較されるか)、このチュートリアルを読む前にZeroMQの紹介とインストールのハウツーをチェックしてください。


ZeroMQ


ZeroMQは、アプリケーションとプロセスの間にメッセージングおよび通信システムを高速かつ非同期に実装するために使用されるライブラリです。

RabbitMQなどの他のアプリケーションメッセージングソリューションの過去の経験がある場合、ZeroMQの正確な位置を理解するのは少し難しいかもしれません。

エンタープライズメッセージングの必要なすべての部分を提供するいくつかのはるかに大規模なプロジェクトと比較すると、ZeroMQは、独自のツールを作成するための軽量で高速なツールとして残っています。

この記事


技術的にはフレームワークではありませんが、その機能と解決するタスクの重要な位置を考えると、ZeroMQをアプリケーションの実際の通信レイヤーを実装するためのバックボーンと見なすことができます。

この記事では、あなたができるすべてのことであなたを刺激するいくつかの例を提供することを目指しています。

注:この例では、Python言語とその従来のインタープリター(Python Cインタープリター)を使用します。 必要な言語バインディングをインストールすると、コードを簡単に翻訳して、問題なくお気に入りを使用できるようになります。 CentOS VPSへのPythonのインストールについて知りたい場合は、 CentOS6.4でPython2.7をセットアップする方法のチュートリアルを確認してください。

ZeroMQを使用したプログラミング


ライブラリとしてのZeroMQは、特定のネットワーク通信パターンに従うことにより、ソケットを介して機能します。 これは非同期で動作するように設計されており、その名前のMQサフィックスは、メッセージを送信する前にスレッドキューイングメッセージから取得されます。

ZeroMQソケットタイプ


ZeroMQは、ソケットの動作方法が異なります。 通常のソケットが機能する同期方法とは異なり、ZeroMQのソケット実装は「非同期メッセージキューの抽象化を提供します」。

これらのソケットの動作方法は、選択したソケットのタイプによって異なります。 また、送信されるメッセージのフローは、選択したパターンによって異なります。そのうちの4つは次のとおりです。

  • 要求/応答パターン:要求を送信し、送信されるたびに後続の応答を受信するために使用されます。

  • パブリッシュ/サブスクライブパターン:単一のプロセスからデータを配布するために使用されます(例: パブリッシャー)から複数の受信者(例: サブスクライバー)。

  • パイプラインパターン:接続されたノードにデータを配信するために使用されます。

  • 排他的ペアパターン: 2つのピアを接続して、ペアを形成するために使用されます。

ZeroMQトランスポートタイプ


ZeroMQは、通信用に4種類のトランスポートを提供します。 これらは:

  • インプロセス(INPROC):ローカル(インプロセス)通信トランスポート。

  • プロセス間(IPC):ローカル(プロセス間)通信トランスポート。

  • TCP:TCPを使用したユニキャスト通信トランスポート。

  • PGM:PGMを使用したマルチキャスト通信トランスポート。

ZeroMQアプリケーションの構造化


ZeroMQは、通常の従来の通信設定とは動作が異なります。 リンクのどちらの側にも含めることができます(つまり、 サーバーまたはクライアントのいずれか)をバインドして接続を待ちます。 標準のソケットとは異なり、ZeroMQは、接続が発生する可能性があることを認識しているという概念で機能するため、接続を完全に待機できます。

クライアント-サーバー構造


クライアントとサーバーのコードを構造化するには、バインディング側としてより安定しているものを決定し、接続として他のものを選択するのが最善です。

例:

Server Application                           Client Application
---------------------[ < .. < .. < .. < .. ......................
Bound -> Port:8080                          Connects <- Port:8080

クライアント-プロキシ-サーバー構造


通信の両端が動的な(したがって不安定な)状態にあることによって引き起こされる問題を解決するために、ZeroMQはネットワークデバイスを提供します(つまり、 箱から出してすぐに使える道具)。 これらのデバイスは2つの異なるポートに接続し、接続を相互にルーティングします。

  • ストリーマー:パイプライン化されたパラレル通信用のストリーマーデバイス。
  • フォワーダー: pub/sub通信用のフォワーディングデバイス。
  • キュー:要求/応答通信用の転送デバイス。

例:

   Server App.            Device | Forward           Client App.
  ............ > .. > . ]------------------[ < .. < .. .........
    Connects               2 Port Binding             Connects

プログラミング例


これまでのセクションの知識を使用して、簡単なアプリケーションを作成するためにそれらを利用し始めます。

注:以下の例は通常、同時に実行されるアプリケーションで構成されています。 たとえば、クライアント/サーバーのセットアップを機能させるには、クライアントとサーバーアプリケーションの両方を一緒に実行する必要があります。 これを行う方法の1つは、LinuxScreenツールを使用することです。 詳細については、このDigitalOceanTutorialをご覧ください。 CentOSシステムにscreenをインストールするには、yum install -y screenを実行するだけでよいことに注意してください。

要求/応答パターンを使用した単純なメッセージング


アプリケーション間の通信に関しては、要求/応答パターンはおそらく絶対的な古典を形成し、ZeroMQの基本的な基本から始める良い機会を与えてくれます。

ユースケース:

  • サーバーとクライアント間の単純な通信用。

  • 情報を確認し、更新を要求します。

  • チェックと更新をサーバーに送信します。

  • エコーまたはピン/ポンの実装。

使用されるソケットタイプ:

  • zmq.REP
  • zmq.REQ

サーバーの例: server.py


nano nano server.py)を使用して「 server.py 」を作成し、以下のわかりやすい内容を貼り付けます。

import zmq

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5678")

# Run a simple "Echo" server
while True:
    message = sock.recv()
    sock.send("Echo: " + message)
    print "Echo: " + message

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

クライアントの例: client.py


nano nano client.py)を使用して「 client.py 」を作成し、以下の内容を貼り付けます。

import zmq
import sys

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.REQ)
sock.connect("tcp://127.0.0.1:5678")

# Send a "message" using the socket
sock.send(" ".join(sys.argv[1:]))
print sock.recv()

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

注: ZeroMQライブラリを使用する場合は、各スレッドがメッセージの送信に使用されたことを忘れないでください(つまり、 .send(..))は、.recv(..)が続くことを期待しています。 ペアの実装に失敗すると、例外が発生します。

使用法


server.pyは、「エコー」アプリケーションとして機能するように設定されています。 私たちがそれに送ることを選んだものは何でも、それはそれを送り返します(例えば 「エコー:メッセージ」)。

Pythonインタープリターを使用してサーバーを実行します。

python server.py

別のウィンドウで、クライアントアプリケーションを使用してメッセージを送信します。

python client.py hello world!
# Echo: hello world!

注:サーバーをシャットダウンするには、次のキーの組み合わせを使用できます:Ctrl + C

パブリッシュ/サブスクライブパターンの操作


パブリッシュ/サブスクライブパターンの場合、ZeroMQを使用して1つ以上のサブスクライバーを確立し、1つ以上のパブリッシャーに接続して、パブリッシャーが送信するもの(またはシード)を継続的に受信します。

このパターンでは、プレフィックスを指定して、プレフィックスで始まるメッセージのみを受け入れるように選択できます。

ユースケース:

パブリッシュ/サブスクライブパターンは、さまざまなコンシューマーにメッセージを均等に分散するために使用されます。 スコアボードとニュースの自動更新は、このソリューションを使用する可能性のある領域と見なすことができます。

使用されるソケットタイプ:

  • zmq.PUB
  • zmq.SUB

出版社の例: pub.py


nano nano pub.py)を使用して「 pub.py 」を作成し、以下の内容を貼り付けます。

import zmq
import time

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:5680")

id = 0

while True:
    time.sleep(1)
    id, now = id+1, time.ctime()
    
    # Message [prefix][message]
    message = "1-Update! >> #{id} >> {time}".format(id=id, time=now)
    sock.send(message)
    
    # Message [prefix][message]
    message = "2-Update! >> #{id} >> {time}".format(id=id, time=now) 
    sock.send(message)
    
    id += 1

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

サブスクライバーの例: sub py


nano nano sub.py)を使用して「 py 」を作成し、以下の内容を貼り付けます。

import zmq

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.SUB)

# Define subscription and messages with prefix to accept.
sock.setsockopt(zmq.SUBSCRIBE, "1")
sock.connect("tcp://127.0.0.1:5680")

while True:
    message= sock.recv()
    print message

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

注: .setsockopt(..)プロシージャを使用して、 string 1で始まるメッセージの受信をサブスクライブしています。 すべてを受け取るには、設定しないでください(つまり "")。

使用法


pub.pyは、パブリッシャーとして機能するように設定されており、異なるサブスクライバー向けに2つの異なるメッセージを同時に送信します。

パブリッシャーを実行してメッセージを送信します。

python pub.py

別のウィンドウで、サブスクライブされたコンテンツのプリントアウトを確認します(つまり、 1):

python sub.py!
# 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013

注:サブスクライバーとパブリッシャーのアプリケーションをシャットダウンするには、次のキーの組み合わせを使用できます:Ctrl + C

Pub./Subのパイプライン化。 パイプラインパターン付き(プッシュ/プル)


パブリッシュ/サブスクライブパターンと非常によく似た3番目のインラインパイプラインパターンは、別の種類の問題(要求に応じてメッセージを配信する)の解決策として提供されます。

ユースケース:

パイプラインパターンは、キューに入れられたアイテムのリストをルーティングする必要がある場合に使用できます(つまり、 プッシュを一列に並べて)それを要求している人(つまり プル)する人。

使用されるソケットタイプ:

  • zmq.PUSH
  • zmq.PULL

PUSHの例: manager.py


nano nano manager.py)を使用して「 manager.py 」を作成し、以下の内容を貼り付けます。

import zmq
import time

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.PUSH)
sock.bind("tcp://127.0.0.1:5690")

id = 0

while True:
    time.sleep(1)
    id, now = id+1, time.ctime()

    # Message [id] - [message]
    message = "{id} - {time}".format(id=id, time=now)

    sock.send(message)

    print "Sent: {msg}".format(msg=message)

ファイルmanager.pyは、タスクアロケータとして機能します。

プルの例:worker_1.py


nano nano worker_1.py)を使用して「worker_1.py」を作成し、以下の内容を貼り付けます。

zmqをインポートする

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
sock = context.socket(zmq.PULL)
sock.connect("tcp://127.0.0.1:5690")

while True:
    message = sock.recv()
    print "Received: {msg}".format(msg=message)

ファイルworker_1.pyは、タスクプロセス(消費者/作業者)として機能します。

使用法


私たちのmanager.pyは、タスクのアロケータの役割を持つように設定されています(つまり マネージャー)、** PUSH**アイテムを。 同様に、 worker インスタンスとして機能するように設定されたworker_1.pyは、リストを** PULL **で処理すると、これらのアイテムを受け取ります。

パブリッシャーを実行してメッセージを送信します。

python manager.py

別のウィンドウで、サブスクライブされたコンテンツのプリントアウトを確認します(つまり、 1):

python worker_1.py!
# 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013

注:サブスクライバーとパブリッシャーのアプリケーションをシャットダウンするには、次のキーの組み合わせを使用できます:Ctrl + C

排他的なペアパターン


排他的ペアパターンは、zmq/PAIRソケットタイプを使用して、ワントーンの種類の通信チャネルを確立することを意味し、許可します。

バインドの例: bind.py


nano nano bind.py)を使用して「 bind.py 」を作成し、以下の内容を貼り付けます。

import zmq

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
socket = context.socket(zmq.PAIR)
socket.bind("tcp://127.0.0.1:5696")

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

接続例: connect.py


nano nano connect.py)を使用して「 connect.py 」を作成し、以下の内容を貼り付けます。

import zmq

# ZeroMQ Context
context = zmq.Context()

# Define the socket using the "Context"
socket = context.socket(zmq.PAIR)
socket.connect("tcp://127.0.0.1:5696")

編集が完了したら、CTRL+Xを押してからYを押して保存して終了します。

使用法


上記の例を使用して、双方向のユニ接続通信アプリケーションを作成できます。

注:どちらかをシャットダウンするには、次のキーの組み合わせを使用できます:Ctrl + C

投稿者: https ://twitter.com/ostezer