◾️はじめに
Apache Airflowにおいて、 DAG「hello」完了後に、別DAG「world」を実行させる方法を考える
目次
【0】Airflow環境設定 【1】TriggerDagRunOperator を使用する 1)サンプル 【2】ExternalTaskSensor を使用する 1)サンプル
【0】Airflow環境設定
* 以下の関連記事を参照のこと
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
【1】TriggerDagRunOperator を使用する
* DAG「hello」 → DAG「world」で、 呼び出し元のDAG「hello」にTriggerDagRunOperatorを実装する
1)サンプル
hello.py
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator import pendulum dag = DAG( dag_id="hello", start_date=pendulum.datetime(2026, 1, 1, tz="UTC"), schedule="@daily", catchup=False, ) task1 = BashOperator( task_id='task1', bash_command='echo "success from hello dag"', dag=dag ) # ★ここに注目 task2 = TriggerDagRunOperator( task_id="task2", trigger_dag_id="world", dag=dag, ) task1 >> task2
world.py
from airflow.models import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator import pendulum dag = DAG( dag_id="world", start_date=pendulum.datetime(2026, 1, 1, tz="UTC"), schedule="@daily", catchup=False, ) task1 = EmptyOperator( task_id="task1", dag=dag, ) task2 = BashOperator( task_id="task2", bash_command='echo "success from world dag"', dag=dag, ) task1 >> task2
【2】ExternalTaskSensor を使用する
* DAG「hello」 → DAG「world」で、 呼び出し先のDAG「world」にExternalTaskSensorを実装する => ExternalTaskSensorは、指定したDAG内のタスク完了を待つ
1)サンプル
hello.py
from datetime import timedelta import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ( ExternalTaskMarker, ExternalTaskSensor, ) with DAG( dag_id='hello_v2', description='A Sample ExternalTaskMarker DAG', schedule="@daily", start_date=pendulum.datetime(2026, 1, 1, tz="UTC"), catchup=False, tags=['demo'], ) as hello_v2_dag: task1_1 = EmptyOperator(task_id='task1_1') # ExternalTaskMarker # https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py task1_2 = ExternalTaskMarker( task_id="task1_2", external_dag_id="world_v2", external_task_id="task2_2", ) # setting up dependencies. task1_1 >> task1_2 with DAG( dag_id="world_v2", description='A Sample ExternalTaskSensor DAG', schedule="@daily", start_date=pendulum.datetime(2026, 1, 1, tz="UTC"), catchup=False, tags=['demo'], ) as world_v2_dag: # ExternalTaskSensor # https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py task2_1 = ExternalTaskSensor( task_id='task2_1', external_dag_id=hello_v2_dag.dag_id, external_task_id='task1_2', timeout=600, allowed_states=['success'], failed_states=['failed', 'skipped'], mode='poke', ) task2_2 = EmptyOperator(task_id='task2_2') # setting up dependencies. task2_1 >> task2_2
参考文献
https://qiita.com/munaita/items/5b6444880722df74ac61
https://engineer-boost.com/google-cloud/?p=3077
https://qiita.com/______________-/items/8ff5b62d26f936557fb8
関連記事
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ DAGが失敗後に別DAGをPauseするには ~
https://dk521123.hatenablog.com/entry/2026/01/24/003046