著者は、 Write for DOnations プログラムの一環として、 Free and Open SourceFundを選択して寄付を受け取りました。

序章

Luigi は、 Python パッケージであり、アイテムのバッチでのデータ処理ジョブの自動実行である、長時間実行されるバッチ処理を管理します。 Luigiを使用すると、データ処理ジョブを一連の依存タスクとして定義できます。 たとえば、タスクBはタスクAの出力に依存します。 また、タスクDはタスクBとタスクCの出力に依存します。 Luigiは、要求されたジョブを完了するために実行する必要のあるタスクを自動的に計算します。

全体として、Luigiは、データ処理パイプラインを開発および管理するためのフレームワークを提供します。 もともとはSpotifyによって開発されました。彼は、さまざまなソースからデータをフェッチして処理する必要があるタスクのコレクションをまとめて管理するために使用します。 Luigi内で、Spotifyの開発者は、障害の処理、タスク間の依存関係を自動的に解決する機能、タスク処理の視覚化など、バッチ処理のニーズに対応する機能を構築しました。 SpotifyはLuigiを使用して、ユーザーへの音楽の推奨事項の提供、内部ダッシュボードへの入力、トップソングのリストの計算など、バッチ処理ジョブをサポートします。

このチュートリアルでは、データ処理パイプラインを構築して、 ProjectGutenbergに関する最も人気のある本から最も一般的な単語を分析します。 これを行うには、Luigiパッケージを使用してパイプラインを構築します。 Luigiのタスク、ターゲット、依存関係、およびパラメーターを使用して、パイプラインを構築します。

前提条件

このチュートリアルを完了するには、次のものが必要です。

ステップ1—Luigiをインストールする

このステップでは、Luigiインストール用のクリーンなサンドボックス環境を作成します。

まず、プロジェクトディレクトリを作成します。 このチュートリアルの場合 luigi-demo:

  1. mkdir luigi-demo

新しく作成されたものに移動します luigi-demo ディレクトリ:

  1. cd luigi-demo

新しい仮想環境を作成する luigi-venv:

  1. python3 -m venv luigi-venv

そして、新しく作成された仮想環境をアクティブ化します。

  1. . luigi-venv/bin/activate

見つけるだろう (luigi-venv) ターミナルプロンプトの前面に追加され、アクティブな仮想環境を示します。

Output
(luigi-venv) [email protected]:~/luigi-demo$

このチュートリアルでは、次の3つのライブラリが必要です。 luigi, beautifulsoup4、 と requests. The requests ライブラリはHTTPリクエストの作成を合理化します。 これを使用して、ProjectGutenbergの書籍リストと分析する書籍をダウンロードします。 The beautifulsoup4 ライブラリは、Webページからのデータを解析する関数を提供します。 これを使用して、ProjectGutenbergサイトで最も人気のある本のリストを解析します。

次のコマンドを実行して、pipを使用してこれらのライブラリをインストールします。

  1. pip install wheel luigi beautifulsoup4 requests

最新バージョンのライブラリとそのすべての依存関係のインストールを確認する応答が表示されます。

Output
Successfully installed beautifulsoup4-4.9.1 certifi-2020.6.20 chardet-3.0.4 docutils-0.16 idna-2.10 lockfile-0.12.2 luigi-3.0.1 python-daemon-2.2.4 python-dateutil-2.8.1 requests-2.24.0 six-1.15.0 soupsieve-2.0.1 tornado-5.1.1 urllib3-1.25.10

プロジェクトの依存関係をインストールしました。 次に、最初のLuigiタスクの作成に進みます。

ステップ2—Luigiタスクを作成する

このステップでは、「Hello World」Luigiタスクを作成して、それらがどのように機能するかを示します。

Luigi task は、パイプラインの実行と、各タスクの入力および出力の依存関係の定義が行われる場所です。 タスクは、パイプラインを作成するための構成要素です。 それらをクラスで定義します。クラスには次のものが含まれます。

  • A run() タスクを実行するためのロジックを保持するメソッド。
  • アン output() タスクによって生成されたアーティファクトを返すメソッド。 The run() メソッドはこれらのアーティファクトを設定します。
  • オプション input() 現在のタスクを実行するために必要なパイプライン内の追加のタスクを返すメソッド。 The run() メソッドはこれらを使用してタスクを実行します。

新しいファイルを作成する hello-world.py:

  1. nano hello-world.py

次に、次のコードをファイルに追加します。

hello-world.py
import luigi

class HelloLuigi(luigi.Task):

    def output(self):
        return luigi.LocalTarget('hello-luigi.txt')

    def run(self):
        with self.output().open("w") as outfile:
            outfile.write("Hello Luigi!")

あなたはそれを定義します HelloLuigi() を追加することによるLuigiタスクです luigi.Task ミックスインに。

The output() メソッドは1つ以上を定義します Target タスクが生成する出力。 この例の場合、次のように定義します。 luigi.LocalTarget、ローカルファイルです。

注: Luigiを使用すると、 AWS S3バケット MongoDBデータベースSQLデータベースなどのさまざまな一般的なデータソースに接続できます。 サポートされているデータソースの完全なリストは、 Luigidocsにあります。

The run() メソッドには、パイプラインステージで実行するコードが含まれています。 この例では、 output() 書き込みモードのターゲットファイル、 self.output().open("w") as outfile: と書く "Hello Luigi!" それに outfile.write("Hello Luigi!").

作成したタスクを実行するには、次のコマンドを実行します。

  1. python -m luigi --module hello-world HelloLuigi --local-scheduler

ここでは、を使用してタスクを実行します python -m 実行する代わりに luigi 直接コマンド; これは、Luigiが実行できるのは現在のコード内にあるコードのみであるためです。 PYTHONPATH. または、追加することもできます PYTHONPATH='.' 次のように、Luigiコマンドの前に:

  1. PYTHONPATH='.' luigi --module hello-world HelloLuigi --local-scheduler

とともに --module hello-world HelloLuigi フラグを立てると、実行するPythonモジュールとLuigiタスクをLuigiに指示します。

The --local-scheduler フラグは、Luigiスケジューラーに接続せず、代わりにこのタスクをローカルで実行するようにLuigiに指示します。 (Luigiスケジューラについてはステップ4 で説明します。)を使用してタスクを実行する local-scheduler フラグは開発作業にのみ推奨されます。

Luigiは、実行されたタスクの要約を出力します。

Output
===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloLuigi() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====

そして、それは新しいファイルを作成します hello-luigi.txt コンテンツ付き:

hello-luigi.txt
Hello Luigi!

ファイルを生成するLuigiタスクを作成し、Luigiを使用して実行しました local-scheduler. 次に、Webページから本のリストを抽出できるタスクを作成します。

ステップ3—本のリストを抽出するタスクを作成する

このステップでは、Luigiタスクを作成し、 run() ProjectGutenbergで最も人気のある本のリストをダウンロードするタスクのメソッド。 を定義します output() これらの本へのリンクをファイルに保存する方法。 これらは、Luigiローカルスケジューラを使用して実行します。

新しいディレクトリを作成します data あなたの内側 luigi-demo ディレクトリ。 これは、で定義されたファイルを保存する場所になります output() あなたのタスクのメソッド。 タスクを実行する前にディレクトリを作成する必要があります。まだ存在していないディレクトリにファイルを書き込もうとすると、Pythonは例外をスローします。

  1. mkdir data
  2. mkdir data/counts
  3. mkdir data/downloads

新しいファイルを作成する word-frequency.py:

  1. nano word-frequency.py

次のコードを挿入します。これは、 ProjectGutenbergで最も読まれている本へのリンクのリストを抽出するLuigiタスクです。

word-frequency.py
import requests
import luigi
from bs4 import BeautifulSoup


class GetTopBooks(luigi.Task):
    """
    Get list of the most popular books from Project Gutenberg
    """

    def output(self):
        return luigi.LocalTarget("data/books_list.txt")

    def run(self):
        resp = requests.get("http://www.gutenberg.org/browse/scores/top")

        soup = BeautifulSoup(resp.content, "html.parser")

        pageHeader = soup.find_all("h2", string="Top 100 EBooks yesterday")[0]
        listTop = pageHeader.find_next_sibling("ol")

        with self.output().open("w") as f:
            for result in listTop.select("li>a"):
                if "/ebooks/" in result["href"]:
                    f.write("http://www.gutenberg.org{link}.txt.utf-8\n"
                        .format(
                            link=result["href"]
                        )
                    )

あなたは output() ファイルのターゲット "data/books_list.txt" 本のリストを保存します。

の中に run() メソッド、あなた:

  • 使用 requests プロジェクトグーテンベルクのトップブックページのHTMLコンテンツをダウンロードするためのライブラリ。
  • 使用 BeautifulSoup ページのコンテンツを解析するためのライブラリ。 The BeautifulSoup ライブラリを使用すると、Webページから情報を取得できます。 の使用について詳しく知るには BeautifulSoup library美しいスープとPython3チュートリアルでWebページをスクレイプする方法をお読みください。
  • で定義された出力ファイルを開きます output() 方法。
  • HTML構造を繰り返し処理して、昨日のトップ100電子書籍リストのすべてのリンクを取得します。 このページでは、これはすべてのリンクを検索しています <a> リストアイテム内にある <li>. それらのリンクのそれぞれについて、それらが以下を含むリンクを指すページにリンクしている場合 /ebooks/、あなたはそれが本であると仮定して、あなたへのそのリンクを書くことができます output() ファイル。

完了したら、ファイルを保存して終了します。

次のコマンドを使用して、この新しいタスクを実行します。

  1. python -m luigi --module word-frequency GetTopBooks --local-scheduler

Luigiは、実行されたタスクの要約を出力します。

Output
===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 GetTopBooks() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====

の中に data ディレクトリ、Luigiは新しいファイルを作成します(data/books_list.txt). 次のコマンドを実行して、ファイルの内容を出力します。

cat data/books_list.txt

このファイルには、ProjectGutenbergの上位プロジェクトリストから抽出されたURLのリストが含まれています。

Output
http://www.gutenberg.org/ebooks/1342.txt.utf-8 http://www.gutenberg.org/ebooks/11.txt.utf-8 http://www.gutenberg.org/ebooks/2701.txt.utf-8 http://www.gutenberg.org/ebooks/1661.txt.utf-8 http://www.gutenberg.org/ebooks/16328.txt.utf-8 http://www.gutenberg.org/ebooks/45858.txt.utf-8 http://www.gutenberg.org/ebooks/98.txt.utf-8 http://www.gutenberg.org/ebooks/84.txt.utf-8 http://www.gutenberg.org/ebooks/5200.txt.utf-8 http://www.gutenberg.org/ebooks/51461.txt.utf-8 ...

Webページから本のリストを抽出できるタスクを作成しました。 次のステップでは、中央のLuigiスケジューラーをセットアップします。

ステップ4—Luigiスケジューラーの実行

次に、Luigiスケジューラーを起動して、タスクを実行および視覚化します。 ステップ3で開発したタスクを実行し、Luigiスケジューラーを使用して実行します。

これまで、Luigiを使用して実行しています --local-scheduler 中央スケジューラに作業を割り当てずにローカルでジョブを実行するためのタグ。 これは開発には役立ちますが、本番環境で使用する場合は、Luigiスケジューラーを使用することをお勧めします。 Luigiスケジューラーは以下を提供します:

  • タスクを実行するための中心点。
  • タスクの実行の視覚化。

Luigiスケジューラー・インターフェースにアクセスするには、ポートへのアクセスを有効にする必要があります 8082. これを行うには、次のコマンドを実行します。

  1. sudo ufw allow 8082/tcp

スケジューラーを実行するには、次のコマンドを実行します。

  1. sudo sh -c ". luigi-venv/bin/activate ;luigid --background --port 8082"

注:再実行しました virtualenv Luigiスケジューラーをバックグラウンドタスクとして起動する前に、スクリプトをrootとしてアクティブ化します。 これは、実行時に sudo the virtualenv 環境変数とエイリアスは引き継がれません。

ルートとして実行したくない場合は、現在のユーザーのバックグラウンドプロセスとしてLuigiスケジューラーを実行できます。 このコマンドは、Luigiスケジューラーをバックグラウンドで実行し、スケジューラーのバックグラウンドタスクからメッセージを非表示にします。 ターミナルでのバックグラウンドプロセスの管理の詳細については、Bashのジョブコントロールを使用してフォアグラウンドプロセスとバックグラウンドプロセスを管理する方法を参照してください。

  1. luigid --port 8082 > /dev/null 2> /dev/null &

ブラウザを開いてLuigiインターフェイスにアクセスします。 これはどちらかになります http://your_server_ip:8082、またはサーバーのドメインを設定している場合 http://your_domain:8082. これにより、Luigiユーザーインターフェイスが開きます。

デフォルトでは、LuigiタスクはLuigiスケジューラーを使用して実行されます。 Luigiスケジューラーを使用して以前のタスクの1つを実行するには、 --local-scheduler コマンドからの引数。 次のコマンドを使用して、ステップ3からタスクを再実行します。

  1. python -m luigi --module word-frequency GetTopBooks

Luigiスケジューラのユーザーインターフェイスを更新します。 GetTopBooksタスクが実行リストとその実行ステータスに追加されています。

引き続きこのユーザーインターフェイスを参照して、パイプラインの進行状況を監視します。

注: HTTPSを介してLuigiスケジューラーを保護したい場合は、Nginxを介して提供できます。 HTTPSを使用してNginxサーバーをセットアップするには、次の手順に従います。 Ubuntu20.04でLet’sEncryptを使用してNginxを保護する方法。 LuigiサーバーをNginxに接続するための適切なNginx構成の提案については、 Github –Luigi –Pull Request2785を参照してください。

Luigiスケジューラーを起動し、それを使用して実行されたタスクを視覚化しました。 次に、本のリストをダウンロードするタスクを作成します。 GetTopBooks() タスク出力。

ステップ5—本をダウンロードする

このステップでは、指定された本をダウンロードするためのLuigiタスクを作成します。 この新しく作成されたタスクとステップ3で作成されたタスクの間の依存関係を定義します。

最初にファイルを開きます。

  1. nano word-frequency.py

次のクラスを追加します GetTopBooks() へのタスク word-frequency.py 次のコードでファイルします。

word-frequency.py
. . .
class DownloadBooks(luigi.Task):
    """
    Download a specified list of books
    """
    FileID = luigi.IntParameter()

    REPLACE_LIST = """.,"';_[]:*-"""

    def requires(self):
        return GetTopBooks()

    def output(self):
        return luigi.LocalTarget("data/downloads/{}.txt".format(self.FileID))

    def run(self):
        with self.input().open("r") as i:
            URL = i.read().splitlines()[self.FileID]

            with self.output().open("w") as outfile:
                book_downloads = requests.get(URL)
                book_text = book_downloads.text

                for char in self.REPLACE_LIST:
                    book_text = book_text.replace(char, " ")

                book_text = book_text.lower()
                outfile.write(book_text)

このタスクでは、 Parameter; この場合、整数パラメーター。 Luigi parameters は、パイプラインの実行に影響を与えるタスクへの入力です。 ここではパラメータを紹介します FileID フェッチするURLのリストの行を指定します。

Luigiタスクにメソッドを追加しました。 def requires(); このメソッドでは、このタスクを実行する前に出力が必要なLuigiタスクを定義します。 あなたはの出力が必要です GetTopBooks() ステップ3で定義したタスク。

の中に output() メソッドでは、ターゲットを定義します。 あなたは FileID このステップで作成されたファイルの名前を作成するためのパラメーター。 この場合、フォーマットします data/downloads/{FileID}.txt.

の中に run() メソッド、あなた:

  • で生成された本のリストを開きます GetTopBooks() 仕事。
  • パラメータで指定された行からURLを取得します FileID.
  • 使用 requests URLから本の内容をダウンロードするためのライブラリ。
  • 本の中の特殊文字を次のように除外します :,.?、したがって、それらは単語分析に含まれません。
  • テキストを小文字に変換して、大文字と小文字が異なる単語を比較できるようにします。
  • フィルタリングされた出力をで指定されたファイルに書き込みます output() 方法。

ファイルを保存して終了します。

新しいを実行します DownloadBooks() このコマンドを使用するタスク:

  1. python -m luigi --module word-frequency DownloadBooks --FileID 2

このコマンドでは、 FileID を使用したパラメータ --FileID 口論。

注:パラメータをで定義するときは注意してください _ 名前に。 Luigiでそれらを参照するには、 _ のために -. たとえば、 File_ID パラメータは次のように参照されます --File-ID 端末からタスクを呼び出すとき。

次の出力が表示されます。

Output
===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 complete ones were encountered: - 1 GetTopBooks() * 1 ran successfully: - 1 DownloadBooks(FileID=2) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====

Luigiが検出した出力から、次の出力がすでに生成されていることに注意してください。 GetTopBooks() そのタスクの実行をスキップしました。 この機能を使用すると、以前の実行で成功した出力を再利用できるため、実行する必要のあるタスクの数を最小限に抑えることができます。

別のタスクの出力を使用し、分析する本のセットをダウンロードするタスクを作成しました。 次のステップでは、ダウンロードした本の中で最も一般的な単語を数えるタスクを作成します。

ステップ6—単語を数え、結果を要約する

このステップでは、ステップ5でダウンロードした各本の単語の頻度をカウントするLuigiタスクを作成します。 これは、並行して実行される最初のタスクになります。

最初にファイルをもう一度開きます。

  1. nano word-frequency.py

次のインポートをの上部に追加します word-frequency.py:

word-frequency.py
from collections import Counter
import pickle

次のタスクをに追加します word-frequency.py、あなたの後 DownloadBooks() 仕事。 このタスクは、前のタスクの出力を取得します DownloadBooks() 指定された本のタスクであり、その本で最も一般的な単語を返します。

word-frequency.py
class CountWords(luigi.Task):
    """
    Count the frequency of the most common words from a file
    """

    FileID = luigi.IntParameter()

    def requires(self):
        return DownloadBooks(FileID=self.FileID)

    def output(self):
        return luigi.LocalTarget(
            "data/counts/count_{}.pickle".format(self.FileID),
            format=luigi.format.Nop
        )

    def run(self):
        with self.input().open("r") as i:
            word_count = Counter(i.read().split())

            with self.output().open("w") as outfile:
                pickle.dump(word_count, outfile)

定義するとき requires() あなたは合格します FileID 次のタスクへのパラメータ。 タスクが別のタスクに依存することを指定するときは、依存するタスクを実行するために必要なパラメーターを指定します。

の中に run() あなたの方法:

  • によって生成されたファイルを開きます DownloadBooks() 仕事。
  • ビルトインを使用する Counter collectionsライブラリのオブジェクト。 これは、本の中で最も一般的な単語を分析する簡単な方法を提供します。
  • 使用 pickle Pythonの出力を保存するライブラリ Counter オブジェクトなので、後のタスクでそのオブジェクトを再利用できます。 pickle は、Pythonオブジェクトをバイトストリームに変換するために使用するライブラリです。バイトストリームを保存して、後のPythonセッションに復元できます。 を設定する必要があります format のプロパティ luigi.LocalTarget バイナリ出力を書き込めるようにするには pickle ライブラリが生成します。

ファイルを保存して終了します。

新しいを実行します CountWords() このコマンドを使用するタスク:

  1. python -m luigi --module word-frequency CountWords --FileID 2

LuigiスケジューラのユーザーインターフェイスでCountWordsタスクグラフビューを開きます。

Hide Done オプションの選択を解除し、 UpstreamDependenciesの選択を解除します。 作成したタスクから実行の流れがわかります。

ダウンロードした本で最も一般的な単語を数えるタスクを作成し、それらのタスク間の依存関係を視覚化しました。 次に、タスクの実行をカスタマイズするために使用できるパラメーターを定義します。

ステップ7—構成パラメーターの定義

このステップでは、構成パラメーターをパイプラインに追加します。 これらにより、分析する本の数と結果に含める単語の数をカスタマイズできます。

タスク間で共有されるパラメータを設定する場合は、 Config() クラス。 他のパイプラインステージは、で定義されたパラメータを参照できます。 Config() クラス; これらは、ジョブの実行時にパイプラインによって設定されます。

以下を追加します Config() クラスの最後まで word-frequency.py. これにより、分析する本の数と要約に含める最も頻繁な単語の数について、パイプラインに2つの新しいパラメーターが定義されます。

word-frequency.py
class GlobalParams(luigi.Config):
    NumberBooks = luigi.IntParameter(default=10)
    NumberTopWords = luigi.IntParameter(default=500)

次のクラスをに追加します word-frequency.py. このクラスは、すべての結果を集約します CountWords() 最も頻繁に使用される単語の要約を作成するタスク:

word-frequency.py
class TopWords(luigi.Task):
    """
    Aggregate the count results from the different files
    """

    def requires(self):
        requiredInputs = []
        for i in range(GlobalParams().NumberBooks):
            requiredInputs.append(CountWords(FileID=i))
        return requiredInputs

    def output(self):
        return luigi.LocalTarget("data/summary.txt")

    def run(self):
        total_count = Counter()
        for input in self.input():
            with input.open("rb") as infile:
                nextCounter = pickle.load(infile)
                total_count += nextCounter

        with self.output().open("w") as f:
            for item in total_count.most_common(GlobalParams().NumberTopWords):
                f.write("{0: <15}{1}\n".format(*item))

の中に requires() メソッドでは、タスクが複数の依存タスクの出力を使用する場所のリストを提供できます。 あなたは GlobalParams().NumberBooks 単語数が必要な本の数を設定するパラメータ。

の中に output() メソッド、あなたは定義します data/summary.txt パイプラインの最終出力となる出力ファイル。

の中に run() あなたの方法:

  • 作成する Counter() 合計数を格納するオブジェクト。
  • ファイルを開き、「アンピック」(ファイルからPythonオブジェクトに変換)します。 CountWords() 方法
  • ロードされたカウントを追加し、それを合計カウントに追加します。
  • 最も一般的な単語をターゲット出力ファイルに書き込みます。

次のコマンドでパイプラインを実行します。

  1. python -m luigi --module word-frequency TopWords --GlobalParams-NumberBooks 15 --GlobalParams-NumberTopWords 750

Luigiは、上位の単語の要約を生成するために必要な残りのタスクを実行します。

Output
===== Luigi Execution Summary ===== Scheduled 31 tasks of which: * 2 complete ones were encountered: - 1 CountWords(FileID=2) - 1 GetTopBooks() * 29 ran successfully: - 14 CountWords(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9) - 14 DownloadBooks(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9) - 1 TopWords() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====

Luigiスケジューラーからパイプラインの実行を視覚化できます。 タスクリストからGetTopBooksタスクを選択し、グラフの表示ボタンを押します。

HideDoneおよびUpstreamDependenciesオプションの選択を解除します。

Luigiで行われている処理の流れを示します。

を開きます data/summary.txt ファイル:

cat data/summary.txt

計算された最も一般的な単語が見つかります:

Output
the 64593 and 41650 of 31896 to 31368 a 25265 i 23449 in 19496 it 16282 that 15907 he 14974 ...

このステップでは、パラメーターを定義して使用し、タスクの実行をカスタマイズしました。 一連の本の最も一般的な単語の要約を生成しました。

このリポジトリでこのチュートリアルのすべてのコードを見つけてください。

結論

このチュートリアルでは、Luigiデータ処理パイプラインと、タスク、パラメーター、構成パラメーター、Luigiスケジューラーなどの主要な機能の使用方法を紹介しました。

Luigiは、箱から出して多数の一般的なデータソースへの接続をサポートしています。 また、大規模で複雑なデータパイプラインを実行するようにスケーリングすることもできます。 これは、データ処理の課題の解決を開始するための強力なフレームワークを提供します。

その他のチュートリアルについては、データ分析トピックページおよびPythonトピックページをご覧ください。