序章

Python threads は、プログラムが複数のプロシージャを一度に実行できるようにする並列処理の形式です。 Pythonの並列処理は、複数のプロセスを使用して実現することもできますが、スレッドは、大量のI / O(入力/出力)を伴うアプリケーションの高速化に特に適しています。

I/ Oバウンド操作には、Web要求の作成とファイルからのデータの読み取りが含まれます。 I / Oバウンド操作とは対照的に、 CPUバウンド操作(Python標準ライブラリで数学を実行するなど)は、Pythonスレッドの恩恵をあまり受けません。

Python3には ThreadPoolExecutor スレッドでコードを実行するためのユーティリティ。

このチュートリアルでは、 ThreadPoolExecutor ネットワーク要求を適切に行うため。 スレッド内での呼び出しに適した関数を定義し、使用します ThreadPoolExecutor その関数を実行し、それらの実行の結果を処理します。

このチュートリアルでは、Wikipediaページの存在を確認するためにネットワークリクエストを行います。

注: I / Oバウンド操作がCPUバウンド操作よりもスレッドの恩恵を受けるという事実は、グローバルインタープリターロックと呼ばれるPythonの特異性が原因です。 必要に応じて、Pythonのグローバルインタープリターロックの詳細については、Pythonの公式ドキュメントを参照してください。

前提条件

このチュートリアルを最大限に活用するには、PythonでのプログラミングとローカルのPythonプログラミング環境にある程度精通していることをお勧めします。 requests インストールされています。

必要な背景情報については、次のチュートリアルを確認できます。

  1. pip install --user requests==2.23.0

ステップ1—スレッドで実行する関数を定義する

スレッドを使用して実行する関数を定義することから始めましょう。

使用する nano または、お好みのテキストエディタ/開発環境で、次のファイルを開くことができます。

  1. nano wiki_page_function.py

このチュートリアルでは、ウィキペディアのページが存在するかどうかを判断する関数を記述します。

wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

The get_wiki_page_existence 関数は2つの引数を受け入れます:ウィキペディアページへのURL(wiki_page_url)、および timeout そのURLからの応答を待機する秒数。

get_wiki_page_existence requests パッケージを使用して、そのURLへのWebリクエストを作成します。 HTTPのステータスコードによって異なります response、ページが存在するかどうかを説明する文字列が返されます。 さまざまなステータスコードは、HTTPリクエストのさまざまな結果を表します。 この手順は、 200 「成功」ステータスコードは、ウィキペディアのページが存在し、 404 「見つかりません」ステータスコードは、ウィキペディアページが存在しないことを意味します。

前提条件のセクションで説明されているように、 requests この関数を実行するためにインストールされたパッケージ。

を追加して関数を実行してみましょう url および次の関数呼び出し get_wiki_page_existence 関数:

wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

コードを追加したら、ファイルを保存して閉じます。

このコードを実行すると、次のようになります。

  1. python wiki_page_function.py

次のような出力が表示されます。

Output
https://en.wikipedia.org/wiki/Ocean - exists

を呼び出す get_wiki_page_existence 有効なウィキペディアページを持つ関数は、ページが実際に存在することを確認する文字列を返します。

警告:一般に、同時実行のバグを回避するために特別な注意を払わずに、Pythonオブジェクトまたは状態をスレッド間で共有することは安全ではありません。 スレッドで実行する関数を定義するときは、単一のジョブを実行し、状態を他のスレッドと共有または公開しない関数を定義するのが最適です。 get_wiki_page_existence そのような関数の例です。

ステップ2—ThreadPoolExecutorを使用してスレッドで関数を実行する

スレッドを使用した呼び出しに適した関数ができたので、次を使用できます。 ThreadPoolExecutor その関数の複数の呼び出しを適切に実行します。

次の強調表示されたコードをプログラムに追加しましょう wiki_page_function.py:

wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

このコードがどのように機能するかを見てみましょう。

  • concurrent.futures アクセスを許可するためにインポートされます ThreadPoolExecutor.
  • A with ステートメントは、を作成するために使用されます ThreadPoolExecutor 実例 executor これにより、完了時にスレッドがすぐにクリーンアップされます。
  • 4つの仕事は submittedexecutor:内のURLごとに1つ wiki_page_urls リスト。
  • への各呼び出し submit に格納されているFutureインスタンスを返します futures リスト。
  • The as_completed 関数はそれぞれを待機します Future get_wiki_page_existence 結果を印刷できるように、completeを呼び出します。

次のコマンドを使用して、このプログラムを再度実行すると、次のようになります。

  1. python wiki_page_function.py

次のような出力が表示されます。

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

この出力は理にかなっています。3つのURLは有効なウィキペディアページであり、そのうちの1つは有効なWikipediaページです。 this_page_does_not_exist ではありません。 出力の順序がこの出力と異なる場合があることに注意してください。 The concurrent.futures.as_completed この例の関数は、ジョブが送信された順序に関係なく、結果が利用可能になるとすぐに結果を返します。

ステップ3—スレッドで実行される関数からの例外の処理

前のステップでは、 get_wiki_page_existence すべての呼び出しに対して正常に値を返しました。 このステップでは、次のことがわかります ThreadPoolExecutor スレッド化された関数の呼び出しで生成された例外を発生させることもできます。

次のサンプルコードブロックを考えてみましょう。

wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

このコードブロックは、手順2で使用したものとほぼ同じですが、2つの重要な違いがあります。

  • 私たちは今合格します timeout=0.00001get_wiki_page_existence. 以来 requests パッケージは、ウィキペディアへのWebリクエストを完了できません。 0.00001 秒、それは発生します ConnectTimeout 例外。
  • キャッチ ConnectTimeout によって発生した例外 future.result() 毎回文字列を出力します。

プログラムを再度実行すると、次の出力が表示されます。

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

ConnectTimeout メッセージが印刷されます—4つごとに1つ wiki_page_urls、それらのどれもで完了することができなかったので 0.00001 秒と4つのそれぞれ get_wiki_page_existence 呼び出しが発生しました ConnectTimeout 例外。

これで、関数呼び出しが ThreadPoolExecutor 例外を発生させた場合、その例外を呼び出すことで通常どおり発生する可能性があります Future.result. 呼び出し Future.result 送信されたすべての呼び出しで、プログラムがスレッド化された関数から発生した例外を見逃さないようにします。

ステップ4—スレッドがある場合とない場合の実行時間の比較

それでは、 ThreadPoolExecutor 実際にあなたのプログラムをより速くします。

まず、時間をかけましょう get_wiki_page_existence スレッドなしで実行した場合:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

コード例では、 get_wiki_page_existence 50の異なるウィキペディアページのURLを1つずつ使用して機能します。 time.time()関数を使用して、プログラムの実行にかかる秒数を出力します。

以前と同じようにこのコードを再度実行すると、次のような出力が表示されます。

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

この出力のエントリ2〜47は、簡潔にするために省略されています。

後に印刷される秒数 Without threads time マシンで実行すると異なります。これで問題ありません。ベースライン番号を取得して、を使用するソリューションと比較します。 ThreadPoolExecutor. この場合、 ~5.803 秒。

同じ50のウィキペディアURLを実行してみましょう get_wiki_page_existence、しかし今回は使用 ThreadPoolExecutor:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

このコードは、ステップ2で作成したコードと同じですが、コードの実行にかかる秒数を示すいくつかのprintステートメントが追加されています。

プログラムを再度実行すると、次のように表示されます。

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

繰り返しますが、後に印刷される秒数 Threaded time コンピュータによって異なります(出力の順序も異なります)。

スレッドがある場合とない場合で、50個のウィキペディアページのURLを取得するための実行時間を比較できるようになりました。

このチュートリアルで使用したマシンでは、スレッドなしで ~5.803 秒、そしてスレッドでかかった ~1.220 秒。 私たちのプログラムは、スレッドを使用すると大幅に高速に実行されました。

結論

このチュートリアルでは、 ThreadPoolExecutor I/Oバウンドのコードを効率的に実行するためのPython3のユーティリティ。 スレッド内での呼び出しに適した関数を作成し、その関数のスレッド実行から出力と例外の両方を取得する方法を学び、スレッドを使用することで得られるパフォーマンスの向上を観察しました。

ここから、concurrent.futuresモジュールによって提供される他の同時実行機能について詳しく知ることができます。