以下の内容はhttps://pydocument.hatenablog.com/entry/2023/04/23/000519より取得しました。


Airflow によるジョブスケジューリング:Pythonでのジョブ管理

Airflow は、Python ベースのワークフロースケジューリングツールです。データパイプラインや ETL 処理など、依存関係を持つ複数のタスクを効率的に管理・実行できます。この記事では、Airflow の基本的な使い方や実践的な応用例を解説します。

1. Airflow のセットアップ

1.1. インストール

Airflow は pip で簡単にインストールできます。

pip install apache-airflow

1.2. データベースの初期化

Airflow は、ワークフローの定義や実行履歴などのメタデータをデータベースに保存します。 デフォルトでは SQLite が使用されます。本番環境では PostgreSQLMySQL などのより堅牢なデータベースの使用が推奨されます。

airflow db init

補足: airflow initdbは古いコマンドのため、airflow db initへ修正しました。

1.3. ユーザーの作成 (オプション)

Web UI にログインするためのユーザーを作成します。

airflow users create \
    --username admin \
    --firstname 任意の名前 \
    --lastname 任意の名前 \
    --role Admin \
    --email 任意のメールアドレス

パスワードは対話形式で設定します。

2. 基本概念:DAG (Directed Acyclic Graph)

Airflow では、ワークフローを DAG (Directed Acyclic Graph:有向非巡回グラフ) として定義します。

  • DAG: ワークフロー全体の定義。タスクの依存関係、実行スケジュールなどを記述します。
  • Operator: タスクの実行内容を定義します。
    • BashOperator: シェルコマンドを実行します。
    • PythonOperator: Python 関数を実行します。
    • EmailOperator: メールを送信します。
    • その他多数の Operator が用意されています。
  • Task: Operator のインスタンス。DAG 内で個々のタスクを識別します。

3. DAG の作成

3.1. 基本的な DAG ファイルの構造

DAG は Python ファイルとして記述します。以下は、最もシンプルな DAG の例です。

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='simple_bash_dag',  # DAGのID(ユニークな名前)
    start_date=datetime(2023, 10, 26),  # DAGの開始日
    schedule=timedelta(days=1),  # 実行間隔 (毎日)
    catchup=False,  # start_date以降、未実行のDAGを遡って実行するか
    tags=['example', 'bash'],  # DAGを整理するためのタグ
) as dag:
    # タスク定義
    print_date = BashOperator(
        task_id='print_date',  # タスクのID
        bash_command='date',  # 実行するコマンド
    )

この DAG は、date コマンドを実行する print_date というタスクを1つだけ持ちます。 schedule=timedelta(days=1)と設定されているため、毎日実行されます。

3.2. タスク間の依存関係

複数のタスクがある場合、タスク間の依存関係を定義できます。

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='bash_dependencies_dag',
    start_date=datetime(2023, 10, 26),
    schedule=timedelta(days=1),
    catchup=False,
) as dag:
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    sleep = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',  # 5秒待機
    )

    print_hello = BashOperator(
        task_id='print_hello',
        bash_command='echo "Hello, Airflow!"',
    )

    # 依存関係の定義 (print_date -> sleep -> print_hello)
    print_date >> sleep >> print_hello
    # または、以下のように記述することも可能です。
    # print_date.set_downstream(sleep)
    # sleep.set_downstream(print_hello)

>> 演算子 (または set_downstream メソッド) を使用して、タスクの実行順序を定義します。上記の例では、print_datesleepprint_hello の順にタスクが実行されます。

3.3. PythonOperator で Python 関数を実行

Python コードを直接実行するには、PythonOperator を使用します。

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

def my_python_function(name: str, num: int):
    """
    サンプル関数: 名前と数字を受け取り、処理を行う
    """
    print(f"Hello, {name}!")
    total = sum(range(1, num + 1))
    print(f"The sum of numbers from 1 to {num} is {total}.")
    return total  # 戻り値を xcom で他のタスクに渡せる

with DAG(
    dag_id='python_operator_dag',
    start_date=datetime(2023, 10, 26),
    schedule=timedelta(days=1),
    catchup=False,
) as dag:
    run_python_task = PythonOperator(
        task_id='run_python_function',
        python_callable=my_python_function,  # 実行する関数
        op_kwargs={'name': 'Airflow', 'num': 10},  # 関数に渡す引数
    )
  • python_callable: 実行する Python 関数を指定します。
  • op_kwargs: 関数にキーワード引数として渡す値を辞書形式で指定します。

3.4 PythonVirtualenvOperator

PythonVirtualenvOperator を使うことで、仮想環境を作成し、その中で Python の関数を実行できます。 これにより、異なる DAG 間で、依存関係を気にせず、ライブラリのバージョンを固定できます。

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonVirtualenvOperator

def callable_virtualenv():
    """
    仮想環境で実行する関数。requests を使用して、Web サイトのステータスコードを取得する。
    """
    import requests

    response = requests.get("https://www.example.com/")
    print(response.status_code)
    if response.status_code == 200:
      return "example.com へのリクエスト成功"
    else:
      return "example.com へのリクエスト失敗"

with DAG(
    dag_id="python_virtualenv_operator_dag",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    # [START howto_operator_python_venv]
    task_with_virtualenv = PythonVirtualenvOperator(
        task_id="task_with_virtualenv",
        python_callable=callable_virtualenv,
        requirements=["requests==2.31.0"],
        system_site_packages=False,
    )

requirementsで、仮想環境にインストールするパッケージを指定します。system_site_packagesFalseにすることで、グローバルのパッケージを使用せず、仮想環境のパッケージのみ利用します。

4. DAG のデプロイと実行

4.1. DAG ファイルの配置

作成した DAG ファイル (Python ファイル) は、Airflow が参照する DAG フォルダに配置します。デフォルトでは、~/airflow/dags です。 このフォルダの場所はairflow.cfg設定ファイルのdags_folderで確認及び変更できます。

4.2. Web サーバーとスケジューラの起動

Airflow の Web UI とスケジューラを起動します。

# 別のターミナルで実行
airflow webserver --port 8080
# 別のターミナルで実行
airflow scheduler

4.3. Web UI での DAG の確認と実行

ブラウザで http://localhost:8080 にアクセスし、Airflow の Web UI を開きます。 デプロイした DAG が一覧表示されます。

UI から DAG を手動で実行したり、実行履歴を確認したりできます。

5. その他の機能

XCom (Cross-Communication)

タスク間で小さなデータ (JSON シリアライズ可能なオブジェクト) を受け渡すための仕組みです。 PythonOperator の戻り値や、task_instance.xcom_push() メソッドで値をプッシュし、task_instance.xcom_pull() メソッドで値を取得できます。

# 値をプッシュするタスク
def push_value(**kwargs):
    value = "My XCom Value"
    kwargs['ti'].xcom_push(key='my_key', value=value)

# 値をプルするタスク
def pull_value(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_task', key='my_key')
    print(f"Pulled value: {value}")

# DAG定義内で、これらの関数をPythonOperatorとして使用する

変数 (Variables)

設定値やパラメータを Airflow の外部に保存し、DAG 内で参照できます。Web UI または CLI から設定できます。 データベース (メタデータとは別のデータベース) に保存されるため、機密情報の管理には適していません。

from airflow.models import Variable

# 変数を取得
my_variable = Variable.get("my_variable_name", default_var="default_value")

# 変数を使用 (例: BashOperator内で)
print_variable = BashOperator(
    task_id='print_variable',
    bash_command=f'echo "{my_variable}"',
)

接続 (Connections)

外部システム (データベース、API など) への接続情報を一元管理できます。Web UI または CLI から設定できます。 パスワードなどの機密情報を安全に管理できます。

*PostgreSQL への接続する例

from airflow.providers.postgres.operators.postgres import PostgresOperator

# PostgresOperator で接続を使用
sql_task = PostgresOperator(
    task_id='run_sql',
    postgres_conn_id='my_postgres_connection', # Airflow UIで設定したConnection ID
    sql='SELECT * FROM my_table;',
)

プール (Pools)

同時実行タスク数を制限するための仕組みです。特定のリソース (データベース接続など) への負荷を制御できます。

DAG 定義内でプールを指定する例

# タスク定義でプールを指定
task_with_pool = BashOperator(
    task_id='task_with_pool',
    bash_command='echo "Running in my_pool"',
    pool='my_pool',  # Airflow UIで設定したPool名
    priority_weight=10, # プール内での優先度
)

センサー (Sensors)

外部リソース (ファイル、データベースレコード、API レスポンスなど) の状態を監視し、条件が満たされるまで待機するタスクです。

from airflow.sensors.filesystem import FileSensor

# ファイルの存在をチェックするセンサー
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/my/file.txt',
    fs_conn_id='my_file_connection', # オプション: 接続情報を指定
    poke_interval=10, # チェック間隔 (秒)
    timeout=600, # タイムアウト (秒)
)

スケジューラ

Airflowのスケジューラは、DAGの実行を管理する重要なコンポーネントです。 主な機能は下記の通りです。

  1. DAGファイルの解析: スケジューラは定期的にDAGフォルダをスキャンし、DAGファイルの変更を検知します。
  2. タスクのスケジューリング: DAG定義に基づいて、実行すべきタスクを特定し、実行キューに追加します。
  3. タスクの状態管理: タスクの実行状態(成功、失敗、再試行など)を追跡し、必要に応じてタスクを再実行します。
  4. 依存関係の解決: タスク間の依存関係を解決し、適切な順序でタスクを実行します。

スケジューラは、高い信頼性と耐障害性を備えており、大規模なワークフローの実行を安定的にサポートします。スケジューラ自体の設定は、通常 airflow.cfg で行われますが、DAG レベルでスケジューリングに関連する設定を行うことも可能です。

# DAGレベルでのスケジューリング設定の例

dag = DAG(
    'my_dag',
    schedule_interval=timedelta(minutes=30),  # 30分ごとに実行
    max_active_runs=1,  # 同時に実行するDAG Runの最大数
    concurrency=5,  # DAG内のタスクの同時実行数
    catchup=False,  # 過去の未実行分を遡って実行しない
)

まとめ

本記事では、Airflow の基本的な使い方から、実践的な機能までを幅広く解説しました。Airflow を活用することで、複雑なワークフローを効率的に自動化し、データ処理パイプラインの信頼性を向上させることができます。本番環境での運用では、Executor (CeleryExecutor, KubernetesExecutor など) の設定、監視、ログ管理、セキュリティ対策なども重要になります。

最後に、Pythonでの業務の自動化について学習可能なUdemyのサイトを紹介します。

[PR]

click.linksynergy.com

click.linksynergy.com

click.linksynergy.com




以上の内容はhttps://pydocument.hatenablog.com/entry/2023/04/23/000519より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14