Ubuntu20.04のPythonでLuigiを使用してデータ処理パイプラインを構築する方法
著者は、 Write for DOnations プログラムの一環として、 Free and Open SourceFundを選択して寄付を受け取りました。
序章
Luigi は、 Python パッケージであり、アイテムのバッチでのデータ処理ジョブの自動実行である、長時間実行されるバッチ処理を管理します。 Luigiを使用すると、データ処理ジョブを一連の依存タスクとして定義できます。 たとえば、タスクBはタスクAの出力に依存します。 また、タスクDはタスクBとタスクCの出力に依存します。 Luigiは、要求されたジョブを完了するために実行する必要のあるタスクを自動的に計算します。
全体として、Luigiは、データ処理パイプラインを開発および管理するためのフレームワークを提供します。 もともとはSpotifyによって開発されました。彼は、さまざまなソースからデータをフェッチして処理する必要があるタスクのコレクションをまとめて管理するために使用します。 Luigi内で、Spotifyの開発者は、障害の処理、タスク間の依存関係を自動的に解決する機能、タスク処理の視覚化など、バッチ処理のニーズに対応する機能を構築しました。 SpotifyはLuigiを使用して、ユーザーへの音楽の推奨事項の提供、内部ダッシュボードへの入力、トップソングのリストの計算など、バッチ処理ジョブをサポートします。
このチュートリアルでは、データ処理パイプラインを構築して、 ProjectGutenbergに関する最も人気のある本から最も一般的な単語を分析します。 これを行うには、Luigiパッケージを使用してパイプラインを構築します。 Luigiのタスク、ターゲット、依存関係、およびパラメーターを使用して、パイプラインを構築します。
前提条件
このチュートリアルを完了するには、次のものが必要です。
- root以外のユーザーでセットアップされたUbuntuサーバー
sudo
特権。 Ubuntu20.04を使用したサーバーの初期設定ガイドに従ってください。 - Python3.6以降および
virtualenv
インストールされています。 Python3をインストールしてUbuntu20.04にローカルプログラミング環境をセットアップする方法に従ってPythonを構成してインストールするvirtualenv
. このチュートリアルでは、環境フォルダーとプロジェクトフォルダーを設定します。
ステップ1—Luigiをインストールする
このステップでは、Luigiインストール用のクリーンなサンドボックス環境を作成します。
まず、プロジェクトディレクトリを作成します。 このチュートリアルの場合 luigi-demo
:
- mkdir luigi-demo
新しく作成されたものに移動します luigi-demo
ディレクトリ:
- cd luigi-demo
新しい仮想環境を作成する luigi-venv
:
- python3 -m venv luigi-venv
そして、新しく作成された仮想環境をアクティブ化します。
- . luigi-venv/bin/activate
見つけるだろう (luigi-venv)
ターミナルプロンプトの前面に追加され、アクティブな仮想環境を示します。
Output(luigi-venv) username@hostname:~/luigi-demo$
このチュートリアルでは、次の3つのライブラリが必要です。 luigi
, beautifulsoup4
、 と requests
. The requests
ライブラリはHTTPリクエストの作成を合理化します。 これを使用して、ProjectGutenbergの書籍リストと分析する書籍をダウンロードします。 The beautifulsoup4
ライブラリは、Webページからのデータを解析する関数を提供します。 これを使用して、ProjectGutenbergサイトで最も人気のある本のリストを解析します。
次のコマンドを実行して、pipを使用してこれらのライブラリをインストールします。
- pip install wheel luigi beautifulsoup4 requests
最新バージョンのライブラリとそのすべての依存関係のインストールを確認する応答が表示されます。
OutputSuccessfully installed beautifulsoup4-4.9.1 certifi-2020.6.20 chardet-3.0.4 docutils-0.16 idna-2.10 lockfile-0.12.2 luigi-3.0.1 python-daemon-2.2.4 python-dateutil-2.8.1 requests-2.24.0 six-1.15.0 soupsieve-2.0.1 tornado-5.1.1 urllib3-1.25.10
プロジェクトの依存関係をインストールしました。 次に、最初のLuigiタスクの作成に進みます。
ステップ2—Luigiタスクを作成する
このステップでは、「Hello World」Luigiタスクを作成して、それらがどのように機能するかを示します。
Luigi task は、パイプラインの実行と、各タスクの入力および出力の依存関係の定義が行われる場所です。 タスクは、パイプラインを作成するための構成要素です。 それらをクラスで定義します。クラスには次のものが含まれます。
- A
run()
タスクを実行するためのロジックを保持するメソッド。 - アン
output()
タスクによって生成されたアーティファクトを返すメソッド。 Therun()
メソッドはこれらのアーティファクトを設定します。 - オプション
input()
現在のタスクを実行するために必要なパイプライン内の追加のタスクを返すメソッド。 Therun()
メソッドはこれらを使用してタスクを実行します。
新しいファイルを作成する hello-world.py
:
- nano hello-world.py
次に、次のコードをファイルに追加します。
import luigi
class HelloLuigi(luigi.Task):
def output(self):
return luigi.LocalTarget('hello-luigi.txt')
def run(self):
with self.output().open("w") as outfile:
outfile.write("Hello Luigi!")
あなたはそれを定義します HelloLuigi()
を追加することによるLuigiタスクです luigi.Task
ミックスインに。
The output()
メソッドは1つ以上を定義します Target
タスクが生成する出力。 この例の場合、次のように定義します。 luigi.LocalTarget
、ローカルファイルです。
注: Luigiを使用すると、 AWS S3バケット、 MongoDBデータベース、SQLデータベースなどのさまざまな一般的なデータソースに接続できます。 サポートされているデータソースの完全なリストは、 Luigidocsにあります。
The run()
メソッドには、パイプラインステージで実行するコードが含まれています。 この例では、 output()
書き込みモードのターゲットファイル、 self.output().open("w") as outfile:
と書く "Hello Luigi!"
それに outfile.write("Hello Luigi!")
.
作成したタスクを実行するには、次のコマンドを実行します。
- python -m luigi --module hello-world HelloLuigi --local-scheduler
ここでは、を使用してタスクを実行します python -m
実行する代わりに luigi
直接コマンド; これは、Luigiが実行できるのは現在のコード内にあるコードのみであるためです。 PYTHONPATH
. または、追加することもできます PYTHONPATH='.'
次のように、Luigiコマンドの前に:
- PYTHONPATH='.' luigi --module hello-world HelloLuigi --local-scheduler
とともに --module hello-world HelloLuigi
フラグを立てると、実行するPythonモジュールとLuigiタスクをLuigiに指示します。
The --local-scheduler
フラグは、Luigiスケジューラーに接続せず、代わりにこのタスクをローカルで実行するようにLuigiに指示します。 (Luigiスケジューラについてはステップ4 で説明します。)を使用してタスクを実行する local-scheduler
フラグは開発作業にのみ推奨されます。
Luigiは、実行されたタスクの要約を出力します。
Output===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloLuigi()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
そして、それは新しいファイルを作成します hello-luigi.txt
コンテンツ付き:
Hello Luigi!
ファイルを生成するLuigiタスクを作成し、Luigiを使用して実行しました local-scheduler
. 次に、Webページから本のリストを抽出できるタスクを作成します。
ステップ3—本のリストを抽出するタスクを作成する
このステップでは、Luigiタスクを作成し、 run()
ProjectGutenbergで最も人気のある本のリストをダウンロードするタスクのメソッド。 を定義します output()
これらの本へのリンクをファイルに保存する方法。 これらは、Luigiローカルスケジューラを使用して実行します。
新しいディレクトリを作成します data
あなたの内側 luigi-demo
ディレクトリ。 これは、で定義されたファイルを保存する場所になります output()
あなたのタスクのメソッド。 タスクを実行する前にディレクトリを作成する必要があります。まだ存在していないディレクトリにファイルを書き込もうとすると、Pythonは例外をスローします。
- mkdir data
- mkdir data/counts
- mkdir data/downloads
新しいファイルを作成する word-frequency.py
:
- nano word-frequency.py
次のコードを挿入します。これは、 ProjectGutenbergで最も読まれている本へのリンクのリストを抽出するLuigiタスクです。
import requests
import luigi
from bs4 import BeautifulSoup
class GetTopBooks(luigi.Task):
"""
Get list of the most popular books from Project Gutenberg
"""
def output(self):
return luigi.LocalTarget("data/books_list.txt")
def run(self):
resp = requests.get("http://www.gutenberg.org/browse/scores/top")
soup = BeautifulSoup(resp.content, "html.parser")
pageHeader = soup.find_all("h2", string="Top 100 EBooks yesterday")[0]
listTop = pageHeader.find_next_sibling("ol")
with self.output().open("w") as f:
for result in listTop.select("li>a"):
if "/ebooks/" in result["href"]:
f.write("http://www.gutenberg.org{link}.txt.utf-8\n"
.format(
link=result["href"]
)
)
あなたは output()
ファイルのターゲット "data/books_list.txt"
本のリストを保存します。
の中に run()
メソッド、あなた:
- 使用
requests
プロジェクトグーテンベルクのトップブックページのHTMLコンテンツをダウンロードするためのライブラリ。 - 使用
BeautifulSoup
ページのコンテンツを解析するためのライブラリ。 TheBeautifulSoup
ライブラリを使用すると、Webページから情報を取得できます。 の使用について詳しく知るにはBeautifulSoup library
、美しいスープとPython3チュートリアルでWebページをスクレイプする方法をお読みください。 - で定義された出力ファイルを開きます
output()
方法。 - HTML構造を繰り返し処理して、昨日のトップ100電子書籍リストのすべてのリンクを取得します。 このページでは、これはすべてのリンクを検索しています
<a>
リストアイテム内にある<li>
. それらのリンクのそれぞれについて、それらが以下を含むリンクを指すページにリンクしている場合/ebooks/
、あなたはそれが本であると仮定して、あなたへのそのリンクを書くことができますoutput()
ファイル。
完了したら、ファイルを保存して終了します。
次のコマンドを使用して、この新しいタスクを実行します。
- python -m luigi --module word-frequency GetTopBooks --local-scheduler
Luigiは、実行されたタスクの要約を出力します。
Output===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 GetTopBooks()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
の中に data
ディレクトリ、Luigiは新しいファイルを作成します(data/books_list.txt
). 次のコマンドを実行して、ファイルの内容を出力します。
cat data/books_list.txt
このファイルには、ProjectGutenbergの上位プロジェクトリストから抽出されたURLのリストが含まれています。
Outputhttp://www.gutenberg.org/ebooks/1342.txt.utf-8
http://www.gutenberg.org/ebooks/11.txt.utf-8
http://www.gutenberg.org/ebooks/2701.txt.utf-8
http://www.gutenberg.org/ebooks/1661.txt.utf-8
http://www.gutenberg.org/ebooks/16328.txt.utf-8
http://www.gutenberg.org/ebooks/45858.txt.utf-8
http://www.gutenberg.org/ebooks/98.txt.utf-8
http://www.gutenberg.org/ebooks/84.txt.utf-8
http://www.gutenberg.org/ebooks/5200.txt.utf-8
http://www.gutenberg.org/ebooks/51461.txt.utf-8
...
Webページから本のリストを抽出できるタスクを作成しました。 次のステップでは、中央のLuigiスケジューラーをセットアップします。
ステップ4—Luigiスケジューラーの実行
次に、Luigiスケジューラーを起動して、タスクを実行および視覚化します。 ステップ3で開発したタスクを実行し、Luigiスケジューラーを使用して実行します。
これまで、Luigiを使用して実行しています --local-scheduler
中央スケジューラに作業を割り当てずにローカルでジョブを実行するためのタグ。 これは開発には役立ちますが、本番環境で使用する場合は、Luigiスケジューラーを使用することをお勧めします。 Luigiスケジューラーは以下を提供します:
- タスクを実行するための中心点。
- タスクの実行の視覚化。
Luigiスケジューラー・インターフェースにアクセスするには、ポートへのアクセスを有効にする必要があります 8082
. これを行うには、次のコマンドを実行します。
- sudo ufw allow 8082/tcp
スケジューラーを実行するには、次のコマンドを実行します。
- sudo sh -c ". luigi-venv/bin/activate ;luigid --background --port 8082"
注:再実行しました virtualenv
Luigiスケジューラーをバックグラウンドタスクとして起動する前に、スクリプトをrootとしてアクティブ化します。 これは、実行時に sudo
the virtualenv
環境変数とエイリアスは引き継がれません。
ルートとして実行したくない場合は、現在のユーザーのバックグラウンドプロセスとしてLuigiスケジューラーを実行できます。 このコマンドは、Luigiスケジューラーをバックグラウンドで実行し、スケジューラーのバックグラウンドタスクからメッセージを非表示にします。 ターミナルでのバックグラウンドプロセスの管理の詳細については、Bashのジョブコントロールを使用してフォアグラウンドプロセスとバックグラウンドプロセスを管理する方法を参照してください。
- luigid --port 8082 > /dev/null 2> /dev/null &
ブラウザを開いてLuigiインターフェイスにアクセスします。 これはどちらかになります http://your_server_ip:8082
、またはサーバーのドメインを設定している場合 http://your_domain:8082
. これにより、Luigiユーザーインターフェイスが開きます。
デフォルトでは、LuigiタスクはLuigiスケジューラーを使用して実行されます。 Luigiスケジューラーを使用して以前のタスクの1つを実行するには、 --local-scheduler
コマンドからの引数。 次のコマンドを使用して、ステップ3からタスクを再実行します。
- python -m luigi --module word-frequency GetTopBooks
Luigiスケジューラのユーザーインターフェイスを更新します。 GetTopBooksタスクが実行リストとその実行ステータスに追加されています。
引き続きこのユーザーインターフェイスを参照して、パイプラインの進行状況を監視します。
注: HTTPSを介してLuigiスケジューラーを保護したい場合は、Nginxを介して提供できます。 HTTPSを使用してNginxサーバーをセットアップするには、次の手順に従います。 Ubuntu20.04でLet’sEncryptを使用してNginxを保護する方法。 LuigiサーバーをNginxに接続するための適切なNginx構成の提案については、 Github –Luigi –Pull Request2785を参照してください。
Luigiスケジューラーを起動し、それを使用して実行されたタスクを視覚化しました。 次に、本のリストをダウンロードするタスクを作成します。 GetTopBooks()
タスク出力。
ステップ5—本をダウンロードする
このステップでは、指定された本をダウンロードするためのLuigiタスクを作成します。 この新しく作成されたタスクとステップ3で作成されたタスクの間の依存関係を定義します。
最初にファイルを開きます。
- nano word-frequency.py
次のクラスを追加します GetTopBooks()
へのタスク word-frequency.py
次のコードでファイルします。
. . .
class DownloadBooks(luigi.Task):
"""
Download a specified list of books
"""
FileID = luigi.IntParameter()
REPLACE_LIST = """.,"';_[]:*-"""
def requires(self):
return GetTopBooks()
def output(self):
return luigi.LocalTarget("data/downloads/{}.txt".format(self.FileID))
def run(self):
with self.input().open("r") as i:
URL = i.read().splitlines()[self.FileID]
with self.output().open("w") as outfile:
book_downloads = requests.get(URL)
book_text = book_downloads.text
for char in self.REPLACE_LIST:
book_text = book_text.replace(char, " ")
book_text = book_text.lower()
outfile.write(book_text)
このタスクでは、 Parameter
; この場合、整数パラメーター。 Luigi parameters は、パイプラインの実行に影響を与えるタスクへの入力です。 ここではパラメータを紹介します FileID
フェッチするURLのリストの行を指定します。
Luigiタスクにメソッドを追加しました。 def requires()
; このメソッドでは、このタスクを実行する前に出力が必要なLuigiタスクを定義します。 あなたはの出力が必要です GetTopBooks()
ステップ3で定義したタスク。
の中に output()
メソッドでは、ターゲットを定義します。 あなたは FileID
このステップで作成されたファイルの名前を作成するためのパラメーター。 この場合、フォーマットします data/downloads/{FileID}.txt
.
の中に run()
メソッド、あなた:
- で生成された本のリストを開きます
GetTopBooks()
仕事。 - パラメータで指定された行からURLを取得します
FileID
. - 使用
requests
URLから本の内容をダウンロードするためのライブラリ。 - 本の中の特殊文字を次のように除外します
:,.?
、したがって、それらは単語分析に含まれません。 - テキストを小文字に変換して、大文字と小文字が異なる単語を比較できるようにします。
- フィルタリングされた出力をで指定されたファイルに書き込みます
output()
方法。
ファイルを保存して終了します。
新しいを実行します DownloadBooks()
このコマンドを使用するタスク:
- python -m luigi --module word-frequency DownloadBooks --FileID 2
このコマンドでは、 FileID
を使用したパラメータ --FileID
口論。
注:パラメータをで定義するときは注意してください _
名前に。 Luigiでそれらを参照するには、 _
のために -
. たとえば、 File_ID
パラメータは次のように参照されます --File-ID
端末からタスクを呼び出すとき。
次の出力が表示されます。
Output===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 complete ones were encountered:
- 1 GetTopBooks()
* 1 ran successfully:
- 1 DownloadBooks(FileID=2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
Luigiが検出した出力から、次の出力がすでに生成されていることに注意してください。 GetTopBooks()
そのタスクの実行をスキップしました。 この機能を使用すると、以前の実行で成功した出力を再利用できるため、実行する必要のあるタスクの数を最小限に抑えることができます。
別のタスクの出力を使用し、分析する本のセットをダウンロードするタスクを作成しました。 次のステップでは、ダウンロードした本の中で最も一般的な単語を数えるタスクを作成します。
ステップ6—単語を数え、結果を要約する
このステップでは、ステップ5でダウンロードした各本の単語の頻度をカウントするLuigiタスクを作成します。 これは、並行して実行される最初のタスクになります。
最初にファイルをもう一度開きます。
- nano word-frequency.py
次のインポートをの上部に追加します word-frequency.py
:
from collections import Counter
import pickle
次のタスクをに追加します word-frequency.py
、あなたの後 DownloadBooks()
仕事。 このタスクは、前のタスクの出力を取得します DownloadBooks()
指定された本のタスクであり、その本で最も一般的な単語を返します。
class CountWords(luigi.Task):
"""
Count the frequency of the most common words from a file
"""
FileID = luigi.IntParameter()
def requires(self):
return DownloadBooks(FileID=self.FileID)
def output(self):
return luigi.LocalTarget(
"data/counts/count_{}.pickle".format(self.FileID),
format=luigi.format.Nop
)
def run(self):
with self.input().open("r") as i:
word_count = Counter(i.read().split())
with self.output().open("w") as outfile:
pickle.dump(word_count, outfile)
定義するとき requires()
あなたは合格します FileID
次のタスクへのパラメータ。 タスクが別のタスクに依存することを指定するときは、依存するタスクを実行するために必要なパラメーターを指定します。
の中に run()
あなたの方法:
- によって生成されたファイルを開きます
DownloadBooks()
仕事。 - ビルトインを使用する
Counter
collectionsライブラリのオブジェクト。 これは、本の中で最も一般的な単語を分析する簡単な方法を提供します。 - 使用
pickle
Pythonの出力を保存するライブラリCounter
オブジェクトなので、後のタスクでそのオブジェクトを再利用できます。pickle
は、Pythonオブジェクトをバイトストリームに変換するために使用するライブラリです。バイトストリームを保存して、後のPythonセッションに復元できます。 を設定する必要がありますformat
のプロパティluigi.LocalTarget
バイナリ出力を書き込めるようにするにはpickle
ライブラリが生成します。
ファイルを保存して終了します。
新しいを実行します CountWords()
このコマンドを使用するタスク:
- python -m luigi --module word-frequency CountWords --FileID 2
LuigiスケジューラのユーザーインターフェイスでCountWordsタスクグラフビューを開きます。
Hide Done オプションの選択を解除し、 UpstreamDependenciesの選択を解除します。 作成したタスクから実行の流れがわかります。
ダウンロードした本で最も一般的な単語を数えるタスクを作成し、それらのタスク間の依存関係を視覚化しました。 次に、タスクの実行をカスタマイズするために使用できるパラメーターを定義します。
ステップ7—構成パラメーターの定義
このステップでは、構成パラメーターをパイプラインに追加します。 これらにより、分析する本の数と結果に含める単語の数をカスタマイズできます。
タスク間で共有されるパラメータを設定する場合は、 Config()
クラス。 他のパイプラインステージは、で定義されたパラメータを参照できます。 Config()
クラス; これらは、ジョブの実行時にパイプラインによって設定されます。
以下を追加します Config()
クラスの最後まで word-frequency.py
. これにより、分析する本の数と要約に含める最も頻繁な単語の数について、パイプラインに2つの新しいパラメーターが定義されます。
class GlobalParams(luigi.Config):
NumberBooks = luigi.IntParameter(default=10)
NumberTopWords = luigi.IntParameter(default=500)
次のクラスをに追加します word-frequency.py
. このクラスは、すべての結果を集約します CountWords()
最も頻繁に使用される単語の要約を作成するタスク:
class TopWords(luigi.Task):
"""
Aggregate the count results from the different files
"""
def requires(self):
requiredInputs = []
for i in range(GlobalParams().NumberBooks):
requiredInputs.append(CountWords(FileID=i))
return requiredInputs
def output(self):
return luigi.LocalTarget("data/summary.txt")
def run(self):
total_count = Counter()
for input in self.input():
with input.open("rb") as infile:
nextCounter = pickle.load(infile)
total_count += nextCounter
with self.output().open("w") as f:
for item in total_count.most_common(GlobalParams().NumberTopWords):
f.write("{0: <15}{1}\n".format(*item))
の中に requires()
メソッドでは、タスクが複数の依存タスクの出力を使用する場所のリストを提供できます。 あなたは GlobalParams().NumberBooks
単語数が必要な本の数を設定するパラメータ。
の中に output()
メソッド、あなたは定義します data/summary.txt
パイプラインの最終出力となる出力ファイル。
の中に run()
あなたの方法:
- 作成する
Counter()
合計数を格納するオブジェクト。 - ファイルを開き、「アンピック」(ファイルからPythonオブジェクトに変換)します。
CountWords()
方法 - ロードされたカウントを追加し、それを合計カウントに追加します。
- 最も一般的な単語をターゲット出力ファイルに書き込みます。
次のコマンドでパイプラインを実行します。
- python -m luigi --module word-frequency TopWords --GlobalParams-NumberBooks 15 --GlobalParams-NumberTopWords 750
Luigiは、上位の単語の要約を生成するために必要な残りのタスクを実行します。
Output===== Luigi Execution Summary =====
Scheduled 31 tasks of which:
* 2 complete ones were encountered:
- 1 CountWords(FileID=2)
- 1 GetTopBooks()
* 29 ran successfully:
- 14 CountWords(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
- 14 DownloadBooks(FileID=0,1,10,11,12,13,14,3,4,5,6,7,8,9)
- 1 TopWords()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
Luigiスケジューラーからパイプラインの実行を視覚化できます。 タスクリストからGetTopBooksタスクを選択し、グラフの表示ボタンを押します。
HideDoneおよびUpstreamDependenciesオプションの選択を解除します。
Luigiで行われている処理の流れを示します。
を開きます data/summary.txt
ファイル:
cat data/summary.txt
計算された最も一般的な単語が見つかります:
Outputthe 64593
and 41650
of 31896
to 31368
a 25265
i 23449
in 19496
it 16282
that 15907
he 14974
...
このステップでは、パラメーターを定義して使用し、タスクの実行をカスタマイズしました。 一連の本の最も一般的な単語の要約を生成しました。
このリポジトリでこのチュートリアルのすべてのコードを見つけてください。
結論
このチュートリアルでは、Luigiデータ処理パイプラインと、タスク、パラメーター、構成パラメーター、Luigiスケジューラーなどの主要な機能の使用方法を紹介しました。
Luigiは、箱から出して多数の一般的なデータソースへの接続をサポートしています。 また、大規模で複雑なデータパイプラインを実行するようにスケーリングすることもできます。 これは、データ処理の課題の解決を開始するための強力なフレームワークを提供します。
その他のチュートリアルについては、データ分析トピックページおよびPythonトピックページをご覧ください。