CeleryとRabbitMQを使用してUbuntuVPSでタスクをキューに入れる方法
序章
非同期または非ブロッキング処理は、特定のタスクの実行をプログラムのメインフローから分離する方法です。 これには、ユーザー向けのコードを中断することなく実行できるなど、いくつかの利点があります。
メッセージパッシングは、プログラムコンポーネントが情報の通信と交換に使用できる方法です。 同期または非同期で実装でき、個別のプロセスが問題なく通信できるようにします。 メッセージキューは多くの場合、追加機能を実装し、パフォーマンスを向上させ、完全にメモリ内に常駐できるため、このタイプの使用法では、従来のデータベースの代わりにメッセージパッシングが実装されることがよくあります。
Celery は、非同期メッセージパッシングシステム上に構築されたタスクキューです。 プログラミングタスクをダンプできるバケットとして使用できます。 タスクを通過したプログラムは、引き続き実行して応答して機能し、その後、セロリをポーリングして、計算が完了したかどうかを確認し、データを取得できます。
セロリはPythonで書かれていますが、そのプロトコルは任意の言語で実装できます。 Webhookを介して他の言語でも機能できます。
プログラムの環境にジョブキューを実装することにより、タスクを簡単にオフロードし、ユーザーからの対話を引き続き処理できます。 これは、アプリケーションの応答性を向上させ、長時間実行される計算の実行中にロックされないようにするための簡単な方法です。
このガイドでは、Ubuntu 12.04 VPSのメッセージングシステムとしてRabbitMQを使用して、セロリジョブキューをインストールして実装します。
コンポーネントをインストールします
Celeryをインストールする
CeleryはPythonで記述されているため、通常のPythonパッケージを処理するのと同じ方法で簡単にインストールできます。
メッセージングシステムをインストールするための仮想環境を作成することにより、Pythonパッケージを処理するための推奨手順に従います。 これにより、環境を安定させ、大規模なシステムに影響を与えないようにすることができます。
UbuntuのデフォルトリポジトリからPython仮想環境パッケージをインストールします。
sudo apt-get update
sudo apt-get install python-virtualenv
システムを実装するメッセージングディレクトリを作成します。
mkdir ~/messaging
cd ~/messaging
これで、次のコマンドを使用してセロリをインストールできる仮想環境を作成できます。
virtualenv --no-site-packages venv
仮想環境を構成したら、次のように入力してアクティブ化できます。
source venv/bin/activate
上記で作成した仮想環境で操作していることを反映して、プロンプトが変わります。 これにより、Pythonパッケージがグローバルではなくローカルにインストールされるようになります。
環境を非アクティブ化する必要がある場合(現在ではない)、次のように入力できます。
deactivate
環境をアクティブ化したので、pipを使用してセロリをインストールできます。
pip install celery
RabbitMQをインストールします
Celeryは、外部ソースからの要求を処理するためにメッセージングエージェントを必要とします。 このエージェントは「ブローカー」と呼ばれます。
リレーショナルデータベース、NoSQLデータベース、Key-Valueストア、実際のメッセージングシステムなど、ブローカーが選択できるオプションはかなりあります。
堅牢で安定したパフォーマンスを提供し、セロリとうまく相互作用するため、RabbitMQメッセージングシステムを使用するようにセロリを構成します。 使用目的に合った機能が含まれているため、優れたソリューションです。
UbuntuのリポジトリからRabbitMQをインストールできます。
sudo apt-get install rabbitmq-server
RabbitMQサービスは、インストール時にサーバー上で自動的に開始されます。
セロリインスタンスを作成する
セロリのタスクキューイング機能を使用するには、インストール後の最初のステップは、セロリインスタンスを作成することである必要があります。 これは、パッケージをインポートし、「アプリ」を作成してから、celeryがバックグラウンドで実行できるタスクを設定するという単純なプロセスです。
メッセージングディレクトリ内にPythonスクリプトを作成してみましょう。 tasks.py
ここで、ワーカーが実行できるタスクを定義できます。
sudo nano ~/messaging/tasks.py
最初にすべきことは、celeryパッケージからCelery関数をインポートすることです。
from celery import Celery
その後、デフォルトのRabbitMQサービスに接続するセロリアプリケーションインスタンスを作成できます。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
への最初の引数 Celery
関数は、タスクを識別するためにタスクの前に付けられる名前です。
The backend
parameterは、バックグラウンドタスクのステータスを照会したり、その結果を取得したりする場合に必要なオプションのパラメーターです。
タスクが、プログラムで使用するための有用な値を返さずに、何らかの作業を行ってから終了する単純な関数である場合は、このパラメーターを省略できます。 一部のタスクのみがこの機能を必要とする場合は、ここで有効にしてください。ケースバイケースで無効にすることができます。
The broker
パラメータは、ブローカーに接続するために必要なURLを指定します。 この場合、これはサーバーで実行されているRabbitMQサービスです。 RabbitMQは、「amqp」と呼ばれるプロトコルを使用して動作します。 RabbitMQがデフォルト構成で動作している場合、セロリは他の情報と接続できません。 amqp://
図式。
セロリタスクを構築する
まだこのファイルに、タスクを追加する必要があります。
各セロリタスクはデコレータで導入する必要があります @app.task
. これにより、セロリはキューイング機能を追加できる機能を識別できます。 各デコレータの後に、ワーカーが実行できる関数を作成するだけです。
最初のタスクは、文字列をコンソールに出力する単純な関数です。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def print_hello():
print 'hello there'
この関数は有用な情報を返さないため(代わりにコンソールに出力します)、このタスクに関する状態情報を格納するためにバックエンドを使用しないようにceleryに指示できます。 これは内部的にはそれほど複雑ではなく、必要なリソースも少なくて済みます。
セロリ輸入セロリからapp = Celery('tasks'、backend ='amqp'、broker ='amqp://')
@アプリ .task (ignore_result = True) def print_hello():print'hello there'
次に、素数を生成する別の関数を追加します( RosettaCode から取得)。 これは長時間実行されるプロセスになる可能性があるため、結果を待っているときに非同期ワーカープロセスを処理する方法の良い例です。
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
print 'hello there'
@app.task
def gen_prime(x):
multiples = []
results = []
for i in xrange(2, x+1):
if i not in multiples:
results.append(i)
for j in xrange(i*i, x+1, i):
multiples.append(j)
return results
この関数の戻り値が何であるかを気にし、いつ完了したかを知りたいので(結果などを使用できるように)、追加しません。 ignore_result
この2番目のタスクのパラメーター。
ファイルを保存して閉じます。
セロリワーカープロセスを開始する
これで、アプリケーションからの接続を受け入れることができるワーカープロセスを開始できます。 作成したファイルを使用して、実行できるタスクについて学習します。
ワーカーインスタンスの起動は、celeryコマンドでアプリケーション名を呼び出すのと同じくらい簡単です。 文字列の最後に「&」文字を含めて、ワーカープロセスをバックグラウンドに配置します。
celery worker -A tasks &
これにより、アプリケーションが起動し、ターミナルから切り離されて、他のタスクで引き続き使用できるようになります。
複数のワーカーを開始する場合は、それぞれに次の名前を付けることで開始できます。 -n
口論:
celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &
The %h
ワーカーに名前が付けられると、ホスト名に置き換えられます。
ワーカーを停止するには、killコマンドを使用できます。 プロセスIDを照会し、この情報に基づいてワーカーを削除できます。
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
これにより、ワーカーは終了する前に現在のタスクを完了できます。
タスクが完了するのを待たずにすべてのワーカーをシャットダウンする場合は、次を実行できます。
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
キューを使用して作業を処理する
生成したワーカープロセスを使用して、プログラムのバックグラウンドで作業を完了することができます。
これがどのように機能するかを示すためにプログラム全体を作成する代わりに、Pythonインタープリターのさまざまなオプションを調べます。
python
プロンプトで、関数を環境にインポートできます。
from tasks import print_hello
from tasks import gen_prime
これらの機能をテストすると、特別な機能はないように見えます。 最初の関数は、期待どおりに行を出力します。
print_hello()
hello there
2番目の関数は、素数のリストを返します。
primes = gen_prime(1000)
print primes
2番目の関数にチェックする数値の範囲を広げると、次の計算中に実行がハングします。
primes = gen_prime(50000)
「CTRL-C」と入力して実行を停止します。 このプロセスは明らかにバックグラウンドで計算されていません。
バックグラウンドワーカーにアクセスするには、 .delay
方法。 セロリは、追加機能で機能をラップします。 このメソッドは、実行するワーカーに関数を渡すために使用されます。 すぐに戻るはずです:
primes = gen_prime.delay(50000)
このタスクは、以前に開始したワーカーによって実行されています。 を構成したため backend
アプリケーションのパラメータを使用すると、計算のステータスを確認して、結果にアクセスできます。
タスクが完了したかどうかを確認するには、 .ready
方法:
primes.ready()
False
「False」の値は、タスクがまだ実行中であり、結果がまだ利用できないことを意味します。 「True」の値を取得すると、その答えで何かを行うことができます。
primes.ready()
True
を使用して値を取得できます .get
方法。
値がで計算されていることをすでに確認している場合 .ready
メソッドの場合、次のようにそのメソッドを使用できます。
print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .
ただし、使用していない場合 .ready
呼び出す前のメソッド .get
、プログラムが結果を待つことを強制されないように「タイムアウト」オプションを追加することをお勧めします。これにより、実装の目的が損なわれます。
print primes.get(timeout=2)
これにより、タイムアウトした場合に例外が発生します。これは、プログラムで処理できます。
結論
これは、プログラム内でセロリの使用を開始するのに十分な情報ですが、このライブラリの全機能のほんの一部にすぎません。 Celeryを使用すると、バックグラウンドタスクをつなぎ合わせたり、タスクをグループ化したり、興味深い方法で機能を組み合わせたりすることができます。
セロリはPythonで書かれていますが、Webhookを介して他の言語で使用できます。 これにより、選択した言語に関係なく、タスクをバックグラウンドに移動するための柔軟性が大幅に向上します。