前回に引き続き、ETLツールをいろいろさわってみたいと思ったところから、今回はPython製のETLツールLuigiをさわってみた。
インストール
pip install luigi
バージョン確認
$ python --version Python 3.9.1 luigi 3.2.0
サンプルコード
import luigi class Hello(luigi.Task): def run(self): out = self.output() with out.open("w") as f: f.write("Hello") def output(self): return luigi.LocalTarget("hello.txt") class World(luigi.Task): def requires(self): return Hello() def run(self): input_ = self.input() output = self.output() out = self.output() with out.open("w") as f: f.write("World") def output(self): return luigi.LocalTarget("World.txt") def main(): luigi.run(main_task_cls=World, local_scheduler=True) if __name__ == "__main__": main()
実行結果
無事にHelloとWorldが出力されている。
$ python hello_world.py
DEBUG: Checking if World() is complete
DEBUG: Checking if Hello() is complete
INFO: Informed scheduler that task World__99914b932b has status PENDING
INFO: Informed scheduler that task Hello__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running Hello()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done Hello()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Hello__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running World()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done World()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task World__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 Hello()
- 1 World()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
luigiの感想
Prefectはタスクとフローという概念があった。
luigiは、Worldクラスの中でHelloをrequires の形で呼んでいるところから、タスクを連続で擬似的なフローを実行していく思想で構成されているように感じた。個人的には、一つの処理の流れがまとまって見えたほうが見通しが良いと感じるので、luigiのWorldタスクの中でHelloタスクを参照している形は処理の流れが追いくくなるので苦手意識を感じた。
一方で、タスクの実行履歴コントロールをファイル出力で管理している点は個人的には手軽で良いと感じた。サンプルコードではoutput関数を呼んでいる部分になる。実行後のディレクトリ配下は次のようになっている。
$ ls Pipfile Pipfile.lock World.txt hello.txt hello_world.py
実行履歴のコントロールは実際の運用面を考えてくると出る問題であるが、ファイル出力ができるということはクラウドのストレージを選べるということであり、安く運用することができる可能性が高まる。また、使いやすさの面でもブラウザ上からストレージ操作することも可能になるので、RDBで管理するよりも運用のハードルも下がると感じる。