Airflow は、Python ベースのワークフロースケジューリングツールです。データパイプラインや ETL 処理など、依存関係を持つ複数のタスクを効率的に管理・実行できます。この記事では、Airflow の基本的な使い方や実践的な応用例を解説します。
1. Airflow のセットアップ
1.1. インストール
Airflow は pip で簡単にインストールできます。
pip install apache-airflow
1.2. データベースの初期化
Airflow は、ワークフローの定義や実行履歴などのメタデータをデータベースに保存します。 デフォルトでは SQLite が使用されます。本番環境では PostgreSQL や MySQL などのより堅牢なデータベースの使用が推奨されます。
airflow db init
補足:
airflow initdbは古いコマンドのため、airflow db initへ修正しました。
1.3. ユーザーの作成 (オプション)
Web UI にログインするためのユーザーを作成します。
airflow users create \
--username admin \
--firstname 任意の名前 \
--lastname 任意の名前 \
--role Admin \
--email 任意のメールアドレス
パスワードは対話形式で設定します。
2. 基本概念:DAG (Directed Acyclic Graph)
Airflow では、ワークフローを DAG (Directed Acyclic Graph:有向非巡回グラフ) として定義します。
- DAG: ワークフロー全体の定義。タスクの依存関係、実行スケジュールなどを記述します。
- Operator: タスクの実行内容を定義します。
BashOperator: シェルコマンドを実行します。PythonOperator: Python 関数を実行します。EmailOperator: メールを送信します。- その他多数の Operator が用意されています。
- Task: Operator のインスタンス。DAG 内で個々のタスクを識別します。
3. DAG の作成
3.1. 基本的な DAG ファイルの構造
DAG は Python ファイルとして記述します。以下は、最もシンプルな DAG の例です。
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id='simple_bash_dag', # DAGのID(ユニークな名前) start_date=datetime(2023, 10, 26), # DAGの開始日 schedule=timedelta(days=1), # 実行間隔 (毎日) catchup=False, # start_date以降、未実行のDAGを遡って実行するか tags=['example', 'bash'], # DAGを整理するためのタグ ) as dag: # タスク定義 print_date = BashOperator( task_id='print_date', # タスクのID bash_command='date', # 実行するコマンド )
この DAG は、date コマンドを実行する print_date というタスクを1つだけ持ちます。
schedule=timedelta(days=1)と設定されているため、毎日実行されます。
3.2. タスク間の依存関係
複数のタスクがある場合、タスク間の依存関係を定義できます。
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id='bash_dependencies_dag', start_date=datetime(2023, 10, 26), schedule=timedelta(days=1), catchup=False, ) as dag: print_date = BashOperator( task_id='print_date', bash_command='date', ) sleep = BashOperator( task_id='sleep', bash_command='sleep 5', # 5秒待機 ) print_hello = BashOperator( task_id='print_hello', bash_command='echo "Hello, Airflow!"', ) # 依存関係の定義 (print_date -> sleep -> print_hello) print_date >> sleep >> print_hello # または、以下のように記述することも可能です。 # print_date.set_downstream(sleep) # sleep.set_downstream(print_hello)
>> 演算子 (または set_downstream メソッド) を使用して、タスクの実行順序を定義します。上記の例では、print_date、sleep、print_hello の順にタスクが実行されます。
3.3. PythonOperator で Python 関数を実行
Python コードを直接実行するには、PythonOperator を使用します。
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator def my_python_function(name: str, num: int): """ サンプル関数: 名前と数字を受け取り、処理を行う """ print(f"Hello, {name}!") total = sum(range(1, num + 1)) print(f"The sum of numbers from 1 to {num} is {total}.") return total # 戻り値を xcom で他のタスクに渡せる with DAG( dag_id='python_operator_dag', start_date=datetime(2023, 10, 26), schedule=timedelta(days=1), catchup=False, ) as dag: run_python_task = PythonOperator( task_id='run_python_function', python_callable=my_python_function, # 実行する関数 op_kwargs={'name': 'Airflow', 'num': 10}, # 関数に渡す引数 )
python_callable: 実行する Python 関数を指定します。op_kwargs: 関数にキーワード引数として渡す値を辞書形式で指定します。
3.4 PythonVirtualenvOperator
PythonVirtualenvOperator を使うことで、仮想環境を作成し、その中で Python の関数を実行できます。 これにより、異なる DAG 間で、依存関係を気にせず、ライブラリのバージョンを固定できます。
from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonVirtualenvOperator def callable_virtualenv(): """ 仮想環境で実行する関数。requests を使用して、Web サイトのステータスコードを取得する。 """ import requests response = requests.get("https://www.example.com/") print(response.status_code) if response.status_code == 200: return "example.com へのリクエスト成功" else: return "example.com へのリクエスト失敗" with DAG( dag_id="python_virtualenv_operator_dag", start_date=pendulum.datetime(2023, 10, 26, tz="UTC"), catchup=False, tags=["example"], ) as dag: # [START howto_operator_python_venv] task_with_virtualenv = PythonVirtualenvOperator( task_id="task_with_virtualenv", python_callable=callable_virtualenv, requirements=["requests==2.31.0"], system_site_packages=False, )
requirementsで、仮想環境にインストールするパッケージを指定します。system_site_packagesをFalseにすることで、グローバルのパッケージを使用せず、仮想環境のパッケージのみ利用します。
4. DAG のデプロイと実行
4.1. DAG ファイルの配置
作成した DAG ファイル (Python ファイル) は、Airflow が参照する DAG フォルダに配置します。デフォルトでは、~/airflow/dags です。
このフォルダの場所はairflow.cfg設定ファイルのdags_folderで確認及び変更できます。
4.2. Web サーバーとスケジューラの起動
Airflow の Web UI とスケジューラを起動します。
# 別のターミナルで実行 airflow webserver --port 8080
# 別のターミナルで実行 airflow scheduler
4.3. Web UI での DAG の確認と実行
ブラウザで http://localhost:8080 にアクセスし、Airflow の Web UI を開きます。
デプロイした DAG が一覧表示されます。
UI から DAG を手動で実行したり、実行履歴を確認したりできます。
5. その他の機能
XCom (Cross-Communication)
タスク間で小さなデータ (JSON シリアライズ可能なオブジェクト) を受け渡すための仕組みです。
PythonOperator の戻り値や、task_instance.xcom_push() メソッドで値をプッシュし、task_instance.xcom_pull() メソッドで値を取得できます。
# 値をプッシュするタスク def push_value(**kwargs): value = "My XCom Value" kwargs['ti'].xcom_push(key='my_key', value=value) # 値をプルするタスク def pull_value(**kwargs): ti = kwargs['ti'] value = ti.xcom_pull(task_ids='push_task', key='my_key') print(f"Pulled value: {value}") # DAG定義内で、これらの関数をPythonOperatorとして使用する
変数 (Variables)
設定値やパラメータを Airflow の外部に保存し、DAG 内で参照できます。Web UI または CLI から設定できます。 データベース (メタデータとは別のデータベース) に保存されるため、機密情報の管理には適していません。
from airflow.models import Variable # 変数を取得 my_variable = Variable.get("my_variable_name", default_var="default_value") # 変数を使用 (例: BashOperator内で) print_variable = BashOperator( task_id='print_variable', bash_command=f'echo "{my_variable}"', )
接続 (Connections)
外部システム (データベース、API など) への接続情報を一元管理できます。Web UI または CLI から設定できます。 パスワードなどの機密情報を安全に管理できます。
*PostgreSQL への接続する例
from airflow.providers.postgres.operators.postgres import PostgresOperator # PostgresOperator で接続を使用 sql_task = PostgresOperator( task_id='run_sql', postgres_conn_id='my_postgres_connection', # Airflow UIで設定したConnection ID sql='SELECT * FROM my_table;', )
プール (Pools)
同時実行タスク数を制限するための仕組みです。特定のリソース (データベース接続など) への負荷を制御できます。
DAG 定義内でプールを指定する例
# タスク定義でプールを指定 task_with_pool = BashOperator( task_id='task_with_pool', bash_command='echo "Running in my_pool"', pool='my_pool', # Airflow UIで設定したPool名 priority_weight=10, # プール内での優先度 )
センサー (Sensors)
外部リソース (ファイル、データベースレコード、API レスポンスなど) の状態を監視し、条件が満たされるまで待機するタスクです。
from airflow.sensors.filesystem import FileSensor # ファイルの存在をチェックするセンサー wait_for_file = FileSensor( task_id='wait_for_file', filepath='/path/to/my/file.txt', fs_conn_id='my_file_connection', # オプション: 接続情報を指定 poke_interval=10, # チェック間隔 (秒) timeout=600, # タイムアウト (秒) )
スケジューラ
Airflowのスケジューラは、DAGの実行を管理する重要なコンポーネントです。 主な機能は下記の通りです。
- DAGファイルの解析: スケジューラは定期的にDAGフォルダをスキャンし、DAGファイルの変更を検知します。
- タスクのスケジューリング: DAG定義に基づいて、実行すべきタスクを特定し、実行キューに追加します。
- タスクの状態管理: タスクの実行状態(成功、失敗、再試行など)を追跡し、必要に応じてタスクを再実行します。
- 依存関係の解決: タスク間の依存関係を解決し、適切な順序でタスクを実行します。
スケジューラは、高い信頼性と耐障害性を備えており、大規模なワークフローの実行を安定的にサポートします。スケジューラ自体の設定は、通常 airflow.cfg で行われますが、DAG レベルでスケジューリングに関連する設定を行うことも可能です。
# DAGレベルでのスケジューリング設定の例 dag = DAG( 'my_dag', schedule_interval=timedelta(minutes=30), # 30分ごとに実行 max_active_runs=1, # 同時に実行するDAG Runの最大数 concurrency=5, # DAG内のタスクの同時実行数 catchup=False, # 過去の未実行分を遡って実行しない )
まとめ
本記事では、Airflow の基本的な使い方から、実践的な機能までを幅広く解説しました。Airflow を活用することで、複雑なワークフローを効率的に自動化し、データ処理パイプラインの信頼性を向上させることができます。本番環境での運用では、Executor (CeleryExecutor, KubernetesExecutor など) の設定、監視、ログ管理、セキュリティ対策なども重要になります。
最後に、Pythonでの業務の自動化について学習可能なUdemyのサイトを紹介します。
[PR]