Ubuntu20.04のPythonでLuigiを使用してデータ処理パイプラインを構築する方法
著者は、 Write for DOnations プログラムの一環として、 Free and Open SourceFundを選択して寄付を受け取りました。
序章
Luigi は、 Python パッケージであり、アイテムのバッチでのデータ処理ジョブの自動実行である、長時間実行されるバッチ処理を管理します。 Luigiを使用すると、データ処理ジョブを一連の依存タスクとして定義できます。 たとえば、タスクBはタスクAの出力に依存します。 また、タスクDはタスクBとタスクCの出力に依存します。 Luigiは、要求されたジョブを完了するために実行する必要のあるタスクを自動的に計算します。
全体として、Luigiは、データ処理パイプラインを開発および管理するためのフレームワークを提供します。 もともとはSpotifyによって開発されました。彼は、さまざまなソースからデータをフェッチして処理する必要があるタスクのコレクションをまとめて管理するために使用します。 Luigi内で、Spotifyの開発者は、障害の処理、タスク間の依存関係を自動的に解決する機能、タスク処理の視覚化など、バッチ処理のニーズに対応する機能を構築しました。 SpotifyはLuigiを使用して、ユーザーへの音楽の推奨事項の提供、内部ダッシュボードへの入力、トップソングのリストの計算など、バッチ処理ジョブをサポートします。
このチュートリアルでは、データ処理パイプラインを構築して、 ProjectGutenbergに関する最も人気のある本から最も一般的な単語を分析します。 これを行うには、Luigiパッケージを使用してパイプラインを構築します。 Luigiのタスク、ターゲット、依存関係、およびパラメーターを使用して、パイプラインを構築します。
前提条件
このチュートリアルを完了するには、次のものが必要です。
sudo
権限を持つroot以外のユーザーでセットアップされたUbuntuサーバー。 Ubuntu20.04を使用したサーバーの初期設定ガイドに従ってください。- Python3.6以降および
virtualenv
がインストールされている。 Python3をインストールしてUbuntu20.04にローカルプログラミング環境をセットアップする方法に従って、Pythonを構成してvirtualenv
をインストールします。 このチュートリアルでは、環境フォルダーとプロジェクトフォルダーを設定します。
ステップ1—Luigiをインストールする
このステップでは、Luigiインストール用のクリーンなサンドボックス環境を作成します。
まず、プロジェクトディレクトリを作成します。 このチュートリアルの場合luigi-demo
:
- mkdir luigi-demo
新しく作成されたluigi-demo
ディレクトリに移動します。
- cd luigi-demo
新しい仮想環境を作成しますluigi-venv
:
- python3 -m venv luigi-venv
そして、新しく作成された仮想環境をアクティブ化します。
- . luigi-venv/bin/activate
ターミナルプロンプトの前に(luigi-venv)
が追加され、アクティブな仮想環境を示します。
Output(luigi-venv) username@hostname:~/luigi-demo$
このチュートリアルでは、luigi
、beautifulsoup4
、およびrequests
の3つのライブラリが必要です。 requests
ライブラリは、HTTPリクエストの作成を合理化します。 これを使用して、ProjectGutenbergの書籍リストと分析する書籍をダウンロードします。 beautifulsoup4
ライブラリは、Webページからのデータを解析するための関数を提供します。 これを使用して、ProjectGutenbergサイトで最も人気のある本のリストを解析します。
次のコマンドを実行して、pipを使用してこれらのライブラリをインストールします。
- pip install wheel luigi beautifulsoup4 requests
最新バージョンのライブラリとそのすべての依存関係のインストールを確認する応答が表示されます。
OutputSuccessfully installed beautifulsoup4-4.9.1 certifi-2020.6.20 chardet-3.0.4 docutils-0.16 idna-2.10 lockfile-0.12.2 luigi-3.0.1 python-daemon-2.2.4 python-dateutil-2.8.1 requests-2.24.0 six-1.15.0 soupsieve-2.0.1 tornado-5.1.1 urllib3-1.25.10
プロジェクトの依存関係をインストールしました。 次に、最初のLuigiタスクの作成に進みます。
ステップ2—Luigiタスクを作成する
このステップでは、「Hello World」Luigiタスクを作成して、それらがどのように機能するかを示します。
Luigi task は、パイプラインの実行と、各タスクの入力および出力の依存関係の定義が行われる場所です。 タスクは、パイプラインを作成するための構成要素です。 それらをクラスで定義します。クラスには次のものが含まれます。
- タスクを実行するためのロジックを保持する
run()
メソッド。 - タスクによって生成されたアーティファクトを返す
output()
メソッド。run()
メソッドは、これらのアーティファクトにデータを入力します。 - 現在のタスクを実行するために必要なパイプライン内の追加のタスクを返すオプションの
input()
メソッド。run()
メソッドは、これらを使用してタスクを実行します。
新しいファイルを作成しますhello-world.py
:
- nano hello-world.py
次に、次のコードをファイルに追加します。
import luigi
class HelloLuigi(luigi.Task):
def output(self):
return luigi.LocalTarget('hello-luigi.txt')
def run(self):
with self.output().open("w") as outfile:
outfile.write("Hello Luigi!")
HelloLuigi()
がLuigiタスクであることを定義するには、luigi.Task
mixinを追加します。
output()
メソッドは、タスクが生成する1つ以上のTarget
出力を定義します。 この例の場合、ローカルファイルであるluigi.LocalTarget
を定義します。
注: Luigiを使用すると、 AWS S3バケット、 MongoDBデータベース、SQLデータベースなどのさまざまな一般的なデータソースに接続できます。 サポートされているデータソースの完全なリストは、 Luigidocsにあります。
run()
メソッドには、パイプラインステージで実行するコードが含まれています。 この例では、output()
ターゲットファイルを書き込みモードself.output().open("w") as outfile:
で開き、"Hello Luigi!"
をoutfile.write("Hello Luigi!")
で書き込みます。
作成したタスクを実行するには、次のコマンドを実行します。
- python -m luigi --module hello-world HelloLuigi --local-scheduler
ここでは、luigi
コマンドを直接実行する代わりに、python -m
を使用してタスクを実行します。 これは、Luigiが実行できるのは現在のPYTHONPATH
内のコードのみであるためです。 または、次のように、Luigiコマンドの前にPYTHONPATH='.'
を追加することもできます。
- PYTHONPATH='.' luigi --module hello-world HelloLuigi --local-scheduler
--module hello-world HelloLuigi
フラグを使用して、実行するPythonモジュールとLuigiタスクをLuigiに指示します。
--local-scheduler
フラグは、LuigiにLuigiスケジューラーに接続せず、代わりにこのタスクをローカルで実行するように指示します。 (Luigiスケジューラについてはステップ4 で説明します。)local-scheduler
フラグを使用してタスクを実行することは、開発作業にのみ推奨されます。
Luigiは、実行されたタスクの要約を出力します。
Output===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloLuigi()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
そして、次の内容の新しいファイルhello-luigi.txt
を作成します。
Hello Luigi!
ファイルを生成するLuigiタスクを作成し、Luigilocal-scheduler
を使用して実行しました。 次に、Webページから本のリストを抽出できるタスクを作成します。
ステップ3—本のリストを抽出するタスクを作成する
このステップでは、Luigiタスクを作成し、タスクのrun()
メソッドを定義して、 ProjectGutenbergで最も人気のある本のリストをダウンロードします。 これらの本へのリンクをファイルに保存するoutput()
メソッドを定義します。 これらは、Luigiローカルスケジューラを使用して実行します。
luigi-demo
ディレクトリ内に新しいディレクトリdata
を作成します。 これは、タスクのoutput()
メソッドで定義されたファイルを保存する場所になります。 タスクを実行する前にディレクトリを作成する必要があります。まだ存在していないディレクトリにファイルを書き込もうとすると、Pythonは例外をスローします。
- mkdir data
- mkdir data/counts
- mkdir data/downloads
新しいファイルを作成しますword-frequency.py
:
- nano word-frequency.py
次のコードを挿入します。これは、 ProjectGutenbergで最も読まれている本へのリンクのリストを抽出するLuigiタスクです。
import requests
import luigi
from bs4 import BeautifulSoup
class GetTopBooks(luigi.Task):
"""
Get list of the most popular books from Project Gutenberg
"""
def output(self):
return luigi.LocalTarget("data/books_list.txt")
def run(self):
resp = requests.get("http://www.gutenberg.org/browse/scores/top")
soup = BeautifulSoup(resp.content, "html.parser")
pageHeader = soup.find_all("h2", string="Top 100 EBooks yesterday")[0]
listTop = pageHeader.find_next_sibling("ol")
with self.output().open("w") as f:
for result in listTop.select("li>a"):
if "/ebooks/" in result["href"]:
f.write("http://www.gutenberg.org{link}.txt.utf-8\n"
.format(
link=result["href"]
)
)
書籍のリストを保存するファイル"data/books_list.txt"
のoutput()
ターゲットを定義します。
run()
メソッドでは、次のことを行います。
requests
ライブラリを使用して、ProjectGutenbergのトップブックページのHTMLコンテンツをダウンロードします。BeautifulSoup
ライブラリを使用して、ページのコンテンツを解析します。BeautifulSoup
ライブラリを使用すると、Webページから情報を取得できます。BeautifulSoup library
の使用方法の詳細については、美しいスープとPython3でWebページをスクレイプする方法のチュートリアルをお読みください。output()
メソッドで定義された出力ファイルを開きます。- HTML構造を繰り返し処理して、昨日のトップ100電子書籍リストのすべてのリンクを取得します。 このページでは、リストアイテム
<li>
内にあるすべてのリンク<a>
を検索しています。 これらのリンクのそれぞれについて、/ebooks/
を含むリンクを指すページにリンクしている場合は、それが本であると見なして、そのリンクをoutput()
ファイルに書き込むことができます。
完了したら、ファイルを保存して終了します。
次のコマンドを使用して、この新しいタスクを実行します。
- python -m luigi --module word-frequency GetTopBooks --local-scheduler
Luigiは、実行されたタスクの要約を出力します。
Output===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 GetTopBooks()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
data
ディレクトリに、Luigiは新しいファイル(data/books_list.txt
)を作成します。 次のコマンドを実行して、ファイルの内容を出力します。
cat data/books_list.txt
このファイルには、ProjectGutenbergの上位プロジェクトリストから抽出されたURLのリストが含まれています。
Outputhttp://www.gutenberg.org/ebooks/1342.txt.utf-8
http://www.gutenberg.org/ebooks/11.txt.utf-8
http://www.gutenberg.org/ebooks/2701.txt.utf-8
http://www.gutenberg.org/ebooks/1661.txt.utf-8
http://www.gutenberg.org/ebooks/16328.txt.utf-8
http://www.gutenberg.org/ebooks/45858.txt.utf-8
http://www.gutenberg.org/ebooks/98.txt.utf-8
http://www.gutenberg.org/ebooks/84.txt.utf-8
http://www.gutenberg.org/ebooks/5200.txt.utf-8
http://www.gutenberg.org/ebooks/51461.txt.utf-8
...
Webページから本のリストを抽出できるタスクを作成しました。 次のステップでは、中央のLuigiスケジューラーをセットアップします。
ステップ4—Luigiスケジューラーの実行
次に、Luigiスケジューラーを起動して、タスクを実行および視覚化します。 ステップ3で開発したタスクを実行し、Luigiスケジューラーを使用して実行します。
これまで、--local-scheduler
タグを使用してLuigiを実行し、中央スケジューラーに作業を割り当てずにローカルでジョブを実行してきました。 これは開発には役立ちますが、本番環境で使用する場合は、Luigiスケジューラーを使用することをお勧めします。 Luigiスケジューラーは以下を提供します:
- タスクを実行するための中心点。
- タスクの実行の視覚化。
Luigiスケジューラー・インターフェースにアクセスするには、ポート8082
へのアクセスを有効にする必要があります。 これを行うには、次のコマンドを実行します。
- sudo ufw allow 8082/tcp
スケジューラーを実行するには、次のコマンドを実行します。
- sudo sh -c ". luigi-venv/bin/activate ;luigid --background --port 8082"
注: Luigiスケジューラーをバックグラウンドタスクとして起動する前に、virtualenv
アクティブ化スクリプトをrootとして再実行しました。 これは、sudo
を実行すると、virtualenv
環境変数とエイリアスが引き継がれないためです。
ルートとして実行したくない場合は、現在のユーザーのバックグラウンドプロセスとしてLuigiスケジューラーを実行できます。 このコマンドは、Luigiスケジューラーをバックグラウンドで実行し、スケジューラーのバックグラウンドタスクからメッセージを非表示にします。 ターミナルでのバックグラウンドプロセスの管理の詳細については、Bashのジョブコントロールを使用してフォアグラウンドプロセスとバックグラウンドプロセスを管理する方法を参照してください。
- luigid --port 8082 > /dev/null 2> /dev/null &
ブラウザを開いてLuigiインターフェイスにアクセスします。 これは、http://your_server_ip:8082
にあるか、サーバーのドメインを設定している場合はhttp://your_domain:8082
になります。 これにより、Luigiユーザーインターフェイスが開きます。
デフォルトでは、LuigiタスクはLuigiスケジューラーを使用して実行されます。 Luigiスケジューラーを使用して以前のタスクの1つを実行するには、コマンドから--local-scheduler
引数を省略します。 次のコマンドを使用して、ステップ3からタスクを再実行します。
- python -m luigi --module word-frequency GetTopBooks
Luigiスケジューラのユーザーインターフェイスを更新します。 GetTopBooksタスクが実行リストとその実行ステータスに追加されています。
引き続きこのユーザーインターフェイスを参照して、パイプラインの進行状況を監視します。
注: HTTPSを介してLuigiスケジューラーを保護したい場合は、Nginxを介して提供できます。 HTTPSを使用してNginxサーバーをセットアップするには、次の手順に従います。 Ubuntu20.04でLet’sEncryptを使用してNginxを保護する方法。 LuigiサーバーをNginxに接続するための適切なNginx構成の提案については、 Github –Luigi –Pull Request2785を参照してください。
Luigiスケジューラーを起動し、それを使用して実行されたタスクを視覚化しました。 次に、GetTopBooks()
タスクが出力する本のリストをダウンロードするタスクを作成します。
ステップ5—本をダウンロードする
このステップでは、指定された本をダウンロードするためのLuigiタスクを作成します。 この新しく作成されたタスクとステップ3で作成されたタスクの間の依存関係を定義します。
最初にファイルを開きます。
- nano word-frequency.py
次のコードを使用して、GetTopBooks()
タスクに続くクラスをword-frequency.py
ファイルに追加します。
. . .
class DownloadBooks(luigi.Task):
"""
Download a specified list of books
"""
FileID = luigi.IntParameter()
REPLACE_LIST = """.,"';_[]:*-"""
def requires(self):
return GetTopBooks()
def output(self):
return luigi.LocalTarget("data/downloads/{}.txt".format(self.FileID))
def run(self):
with self.input().open("r") as i:
URL = i.read().splitlines()[self.FileID]
with self.output().open("w") as outfile:
book_downloads = requests.get(URL)
book_text = book_downloads.text
for char in self.REPLACE_LIST:
book_text = book_text.replace(char, " ")
book_text = book_text.lower()
outfile.write(book_text)
このタスクでは、Parameter
を紹介します。 この場合、整数パラメーター。 Luigi parameters は、パイプラインの実行に影響を与えるタスクへの入力です。 ここでは、パラメータFileID
を導入して、フェッチするURLのリストの行を指定します。
Luigiタスクにdef requires()
というメソッドを追加しました。 このメソッドでは、このタスクを実行する前に出力が必要なLuigiタスクを定義します。 ステップ3で定義したGetTopBooks()
タスクの出力が必要です。
output()
メソッドでは、ターゲットを定義します。 FileID
パラメーターを使用して、このステップで作成されたファイルの名前を作成します。 この場合、data/downloads/{FileID}.txt
をフォーマットします。
run()
メソッドでは、次のことを行います。
GetTopBooks()
タスクで生成された本のリストを開きます。- パラメータ
FileID
で指定された行からURLを取得します。 requests
ライブラリを使用して、URLから本の内容をダウンロードします。:,.?
のような本の中の特殊文字を除外して、単語分析に含まれないようにします。- テキストを小文字に変換して、大文字と小文字が異なる単語を比較できるようにします。
- フィルタリングされた出力を
output()
メソッドで指定されたファイルに書き込みます。
ファイルを保存して終了します。
次のコマンドを使用して、新しいDownloadBooks()
タスクを実行します。
- python -m luigi --module word-frequency DownloadBooks --FileID 2
このコマンドでは、--FileID
引数を使用してFileID
パラメーターを設定します。
注:名前に_
を含むパラメーターを定義する場合は注意が必要です。 Luigiでそれらを参照するには、_
を-
に置き換える必要があります。 たとえば、端末からタスクを呼び出す場合、File_ID
パラメーターは--File-ID
として参照されます。
次の出力が表示されます。
Output===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 GetTopBooks()
* 1 ran successfully:
- 1 DownloadBooks(FileID=2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
Luigiが検出した出力から、GetTopBooks()
の出力がすでに生成されており、そのタスクの実行をスキップしていることに注意してください。 この機能を使用すると、以前の実行で成功した出力を再利用できるため、実行する必要のあるタスクの数を最小限に抑えることができます。
別のタスクの出力を使用し、分析する本のセットをダウンロードするタスクを作成しました。 次のステップでは、ダウンロードした本の中で最も一般的な単語を数えるタスクを作成します。
ステップ6—単語を数え、結果を要約する
このステップでは、ステップ5でダウンロードした各本の単語の頻度をカウントするLuigiタスクを作成します。 これは、並行して実行される最初のタスクになります。
最初にファイルをもう一度開きます。
- nano word-frequency.py
word-frequency.py
の先頭に次のインポートを追加します。
from collections import Counter
import pickle
DownloadBooks()
タスクの後に、次のタスクをword-frequency.py
に追加します。 このタスクは、指定された本の前のDownloadBooks()
タスクの出力を取得し、その本で最も一般的な単語を返します。
class CountWords(luigi.Task):
"""
Count the frequency of the most common words from a file
"""
FileID = luigi.IntParameter()
def requires(self):
return DownloadBooks(FileID=self.FileID)
def output(self):
return luigi.LocalTarget(
"data/counts/count_{}.pickle".format(self.FileID),
format=luigi.format.Nop
)
def run(self):
with self.input().open("r") as i:
word_count = Counter(i.read().split())
with self.output().open("w") as outfile:
pickle.dump(word_count, outfile)
requires()
を定義するときは、FileID
パラメーターを次のタスクに渡します。 タスクが別のタスクに依存することを指定するときは、依存するタスクを実行するために必要なパラメーターを指定します。
run()
メソッドでは、次のことを行います。
DownloadBooks()
タスクによって生成されたファイルを開きます。- collectionsライブラリに組み込まれている
Counter
オブジェクトを使用します。 これは、本の中で最も一般的な単語を分析する簡単な方法を提供します。 pickle
ライブラリを使用してPythonCounter
オブジェクトの出力を保存し、後のタスクでそのオブジェクトを再利用できるようにします。pickle
は、Pythonオブジェクトをバイトストリームに変換するために使用するライブラリであり、これを保存して、後のPythonセッションに復元できます。luigi.LocalTarget
のformat
プロパティを設定して、pickle
ライブラリが生成するバイナリ出力を書き込めるようにする必要があります。
ファイルを保存して終了します。
次のコマンドを使用して、新しいCountWords()
タスクを実行します。
- python -m luigi --module word-frequency CountWords --FileID 2
LuigiスケジューラのユーザーインターフェイスでCountWordsタスクグラフビューを開きます。
Hide Done オプションの選択を解除し、 UpstreamDependenciesの選択を解除します。 作成したタスクから実行の流れがわかります。
ダウンロードした本の中で最も一般的な単語を数えるタスクを作成し、それらのタスク間の依存関係を視覚化しました。 次に、タスクの実行をカスタマイズするために使用できるパラメーターを定義します。
ステップ7—構成パラメーターの定義
このステップでは、構成パラメーターをパイプラインに追加します。 これらにより、分析する本の数と結果に含める単語の数をカスタマイズできます。
タスク間で共有するパラメータを設定する場合は、Config()
クラスを作成できます。 他のパイプラインステージは、Config()
クラスで定義されたパラメーターを参照できます。 これらは、ジョブの実行時にパイプラインによって設定されます。
次のConfig()
クラスをword-frequency.py
の最後に追加します。 これにより、分析する本の数と要約に含める最も頻繁な単語の数について、パイプラインに2つの新しいパラメーターが定義されます。
class GlobalParams(luigi.Config):
NumberBooks = luigi.IntParameter(default=10)
NumberTopWords = luigi.IntParameter(default=500)
次のクラスをword-frequency.py
に追加します。 このクラスは、すべてのCountWords()
タスクの結果を集約して、最も頻繁に使用される単語の要約を作成します。
class TopWords(luigi.Task):
"""
Aggregate the count results from the different files
"""
def requires(self):
requiredInputs = []
for i in range(GlobalParams().NumberBooks):
requiredInputs.append(CountWords(FileID=i))
return requiredInputs
def output(self):
return luigi.LocalTarget("data/summary.txt")
def run(self):
total_count = Counter()
for input in self.input():
with input.open("rb") as infile:
nextCounter = pickle.load(infile)
total_count += nextCounter
with self.output().open("w") as f:
for item in total_count.most_common(GlobalParams().NumberTopWords):
f.write("{0: <15}{1}\n".format(*item))
requires()
メソッドでは、タスクが複数の依存タスクの出力を使用する場所のリストを提供できます。 GlobalParams().NumberBooks
パラメーターを使用して、単語数をカウントする必要のある本の数を設定します。
output()
メソッドでは、パイプラインの最終出力となるdata/summary.txt
出力ファイルを定義します。
run()
メソッドでは、次のことを行います。
Counter()
オブジェクトを作成して、合計数を保存します。CountWords()
メソッドで実行されるカウントごとに、ファイルを開いて「アンピック」します(ファイルからPythonオブジェクトに変換し直します)。- ロードされたカウントを追加し、それを合計カウントに追加します。
- 最も一般的な単語をターゲット出力ファイルに書き込みます。
次のコマンドでパイプラインを実行します。
- python -m luigi --module word-frequency TopWords --GlobalParams-NumberBooks 15 --GlobalParams-NumberTopWords 750
Luigiは、上位の単語の要約を生成するために必要な残りのタスクを実行します。
Output===== Luigi Execution Summary =====
Scheduled 31 tasks of which:
* 2 complete ones were encountered:
- 1 CountWords(FileID=2)
- 1 GetTopBooks()
* 29 ran successfully:
- 14 CountWords(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
- 14 DownloadBooks(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
- 1 TopWords()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
Luigiスケジューラーからパイプラインの実行を視覚化できます。 タスクリストでGetTopBooksタスクを選択し、グラフの表示ボタンを押します。
HideDoneおよびUpstreamDependenciesオプションの選択を解除します。
Luigiで行われている処理の流れを示します。
data/summary.txt
ファイルを開きます。
cat data/summary.txt
計算された最も一般的な単語が見つかります:
Outputthe 64593
and 41650
of 31896
to 31368
a 25265
i 23449
in 19496
it 16282
that 15907
he 14974
...
このステップでは、パラメーターを定義して使用し、タスクの実行をカスタマイズしました。 一連の本の最も一般的な単語の要約を生成しました。
このリポジトリでこのチュートリアルのすべてのコードを見つけてください。
結論
このチュートリアルでは、Luigiデータ処理パイプラインと、タスク、パラメーター、構成パラメーター、Luigiスケジューラーなどの主要な機能の使用方法を紹介しました。
Luigiは、箱から出して多数の一般的なデータソースへの接続をサポートしています。 また、大規模で複雑なデータパイプラインを実行するようにスケーリングすることもできます。 これは、データ処理の課題の解決を開始するための強力なフレームワークを提供します。
その他のチュートリアルについては、データ分析トピックページおよびPythonトピックページをご覧ください。