序章
Python threads は、プログラムが複数のプロシージャを一度に実行できるようにする並列処理の形式です。 Pythonの並列処理は、複数のプロセスを使用して実現することもできますが、スレッドは、大量のI / O(入力/出力)を伴うアプリケーションの高速化に特に適しています。
例I/ Oバウンド操作には、Web要求の作成とファイルからのデータの読み取りが含まれます。 I / Oバウンド操作とは対照的に、 CPUバウンド操作(Python標準ライブラリで数学を実行するなど)は、Pythonスレッドの恩恵をあまり受けません。
Python3には ThreadPoolExecutor
スレッドでコードを実行するためのユーティリティ。
このチュートリアルでは、 ThreadPoolExecutor
ネットワーク要求を適切に行うため。 スレッド内での呼び出しに適した関数を定義し、使用します ThreadPoolExecutor
その関数を実行し、それらの実行の結果を処理します。
このチュートリアルでは、Wikipediaページの存在を確認するためにネットワークリクエストを行います。
注: I / Oバウンド操作がCPUバウンド操作よりもスレッドの恩恵を受けるという事実は、グローバルインタープリターロックと呼ばれるPythonの特異性が原因です。 必要に応じて、Pythonのグローバルインタープリターロックの詳細については、Pythonの公式ドキュメントを参照してください。
前提条件
このチュートリアルを最大限に活用するには、PythonでのプログラミングとローカルのPythonプログラミング環境にある程度精通していることをお勧めします。 requests
インストールされています。
必要な背景情報については、次のチュートリアルを確認できます。
-
をインストールするには
requests
ローカルPythonプログラミング環境にパッケージ化すると、次のコマンドを実行できます。
- pip install --user requests==2.23.0
ステップ1—スレッドで実行する関数を定義する
スレッドを使用して実行する関数を定義することから始めましょう。
使用する nano
または、お好みのテキストエディタ/開発環境で、次のファイルを開くことができます。
- nano wiki_page_function.py
このチュートリアルでは、ウィキペディアのページが存在するかどうかを判断する関数を記述します。
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
The get_wiki_page_existence
関数は2つの引数を受け入れます:ウィキペディアページへのURL(wiki_page_url
)、および timeout
そのURLからの応答を待機する秒数。
get_wiki_page_existence
requests パッケージを使用して、そのURLへのWebリクエストを作成します。 HTTPのステータスコードによって異なります response
、ページが存在するかどうかを説明する文字列が返されます。 さまざまなステータスコードは、HTTPリクエストのさまざまな結果を表します。 この手順は、 200
「成功」ステータスコードは、ウィキペディアのページが存在し、 404
「見つかりません」ステータスコードは、ウィキペディアページが存在しないことを意味します。
前提条件のセクションで説明されているように、 requests
この関数を実行するためにインストールされたパッケージ。
を追加して関数を実行してみましょう url
および次の関数呼び出し get_wiki_page_existence
関数:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
コードを追加したら、ファイルを保存して閉じます。
このコードを実行すると、次のようになります。
- python wiki_page_function.py
次のような出力が表示されます。
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
を呼び出す get_wiki_page_existence
有効なウィキペディアページを持つ関数は、ページが実際に存在することを確認する文字列を返します。
警告:一般に、同時実行のバグを回避するために特別な注意を払わずに、Pythonオブジェクトまたは状態をスレッド間で共有することは安全ではありません。 スレッドで実行する関数を定義するときは、単一のジョブを実行し、状態を他のスレッドと共有または公開しない関数を定義するのが最適です。 get_wiki_page_existence
そのような関数の例です。
ステップ2—ThreadPoolExecutorを使用してスレッドで関数を実行する
スレッドを使用した呼び出しに適した関数ができたので、次を使用できます。 ThreadPoolExecutor
その関数の複数の呼び出しを適切に実行します。
次の強調表示されたコードをプログラムに追加しましょう wiki_page_function.py
:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
このコードがどのように機能するかを見てみましょう。
concurrent.futures
アクセスを許可するためにインポートされますThreadPoolExecutor
.- A
with
ステートメントは、を作成するために使用されますThreadPoolExecutor
実例executor
これにより、完了時にスレッドがすぐにクリーンアップされます。 - 4つの仕事は
submitted
にexecutor
:内のURLごとに1つwiki_page_urls
リスト。 - への各呼び出し
submit
に格納されているFutureインスタンスを返しますfutures
リスト。 - The
as_completed
関数はそれぞれを待機しますFuture
get_wiki_page_existence
結果を印刷できるように、completeを呼び出します。
次のコマンドを使用して、このプログラムを再度実行すると、次のようになります。
- python wiki_page_function.py
次のような出力が表示されます。
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
この出力は理にかなっています。3つのURLは有効なウィキペディアページであり、そのうちの1つは有効なWikipediaページです。 this_page_does_not_exist
ではありません。 出力の順序がこの出力と異なる場合があることに注意してください。 The concurrent.futures.as_completed
この例の関数は、ジョブが送信された順序に関係なく、結果が利用可能になるとすぐに結果を返します。
ステップ3—スレッドで実行される関数からの例外の処理
前のステップでは、 get_wiki_page_existence
すべての呼び出しに対して正常に値を返しました。 このステップでは、次のことがわかります ThreadPoolExecutor
スレッド化された関数の呼び出しで生成された例外を発生させることもできます。
次のサンプルコードブロックを考えてみましょう。
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
このコードブロックは、手順2で使用したものとほぼ同じですが、2つの重要な違いがあります。
- 私たちは今合格します
timeout=0.00001
にget_wiki_page_existence
. 以来requests
パッケージは、ウィキペディアへのWebリクエストを完了できません。0.00001
秒、それは発生しますConnectTimeout
例外。 - キャッチ
ConnectTimeout
によって発生した例外future.result()
毎回文字列を出力します。
プログラムを再度実行すると、次の出力が表示されます。
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
四 ConnectTimeout
メッセージが印刷されます—4つごとに1つ wiki_page_urls
、それらのどれもで完了することができなかったので 0.00001
秒と4つのそれぞれ get_wiki_page_existence
呼び出しが発生しました ConnectTimeout
例外。
これで、関数呼び出しが ThreadPoolExecutor
例外を発生させた場合、その例外を呼び出すことで通常どおり発生する可能性があります Future.result
. 呼び出し Future.result
送信されたすべての呼び出しで、プログラムがスレッド化された関数から発生した例外を見逃さないようにします。
ステップ4—スレッドがある場合とない場合の実行時間の比較
それでは、 ThreadPoolExecutor
実際にあなたのプログラムをより速くします。
まず、時間をかけましょう get_wiki_page_existence
スレッドなしで実行した場合:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
コード例では、 get_wiki_page_existence
50の異なるウィキペディアページのURLを1つずつ使用して機能します。 time.time()関数を使用して、プログラムの実行にかかる秒数を出力します。
以前と同じようにこのコードを再度実行すると、次のような出力が表示されます。
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
この出力のエントリ2〜47は、簡潔にするために省略されています。
後に印刷される秒数 Without threads time
マシンで実行すると異なります。これで問題ありません。ベースライン番号を取得して、を使用するソリューションと比較します。 ThreadPoolExecutor
. この場合、 ~5.803
秒。
同じ50のウィキペディアURLを実行してみましょう get_wiki_page_existence
、しかし今回は使用 ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
このコードは、ステップ2で作成したコードと同じですが、コードの実行にかかる秒数を示すいくつかのprintステートメントが追加されています。
プログラムを再度実行すると、次のように表示されます。
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
繰り返しますが、後に印刷される秒数 Threaded time
コンピュータによって異なります(出力の順序も異なります)。
スレッドがある場合とない場合で、50個のウィキペディアページのURLを取得するための実行時間を比較できるようになりました。
このチュートリアルで使用したマシンでは、スレッドなしで ~5.803
秒、そしてスレッドでかかった ~1.220
秒。 私たちのプログラムは、スレッドを使用すると大幅に高速に実行されました。
結論
このチュートリアルでは、 ThreadPoolExecutor
I/Oバウンドのコードを効率的に実行するためのPython3のユーティリティ。 スレッド内での呼び出しに適した関数を作成し、その関数のスレッド実行から出力と例外の両方を取得する方法を学び、スレッドを使用することで得られるパフォーマンスの向上を観察しました。
ここから、concurrent.futuresモジュールによって提供される他の同時実行機能について詳しく知ることができます。