以下の内容はhttps://dk521123.hatenablog.com/entry/2026/01/23/222936より取得しました。


【Airflow】Apache Airflow ~ DAG完了後に別DAG実行を考える ~

◾️はじめに

 Apache Airflowにおいて、
DAG「hello」完了後に、別DAG「world」を実行させる方法を考える

目次

【0】Airflow環境設定
【1】TriggerDagRunOperator を使用する
 1)サンプル
【2】ExternalTaskSensor を使用する
 1)サンプル

【0】Airflow環境設定

* 以下の関連記事を参照のこと

Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840

【1】TriggerDagRunOperator を使用する

* DAG「hello」 → DAG「world」で、
 呼び出し元のDAG「hello」にTriggerDagRunOperatorを実装する

1)サンプル

hello.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import pendulum


dag = DAG(
    dag_id="hello",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
)

task1 = BashOperator(
    task_id='task1',
    bash_command='echo "success from hello dag"',
    dag=dag
)

# ★ここに注目
task2 = TriggerDagRunOperator(
    task_id="task2",
    trigger_dag_id="world", 
    dag=dag,
)

task1 >> task2

world.py

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import pendulum


dag = DAG(
    dag_id="world",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
)

task1 = EmptyOperator(
    task_id="task1",
    dag=dag,
)

task2 = BashOperator(
    task_id="task2",
    bash_command='echo "success from world dag"',
    dag=dag,
)

task1 >> task2

【2】ExternalTaskSensor を使用する

* DAG「hello」 → DAG「world」で、
 呼び出し先のDAG「world」にExternalTaskSensorを実装する
 => ExternalTaskSensorは、指定したDAG内のタスク完了を待つ

1)サンプル

hello.py

from datetime import timedelta
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import (
    ExternalTaskMarker,
    ExternalTaskSensor,
)

with DAG(
    dag_id='hello_v2',
    description='A Sample ExternalTaskMarker DAG',
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=['demo'],
) as hello_v2_dag:

    task1_1 = EmptyOperator(task_id='task1_1')

    # ExternalTaskMarker
    # https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py
    task1_2 = ExternalTaskMarker(
        task_id="task1_2",
        external_dag_id="world_v2",
        external_task_id="task2_2",
    )

    # setting up dependencies.
    task1_1 >> task1_2

with DAG(
    dag_id="world_v2",
    description='A Sample ExternalTaskSensor DAG',
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=['demo'],
) as world_v2_dag:

    # ExternalTaskSensor
    # https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py
    task2_1 = ExternalTaskSensor(
        task_id='task2_1',
        external_dag_id=hello_v2_dag.dag_id,
        external_task_id='task1_2',
        timeout=600,
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        mode='poke',
    )

    task2_2 = EmptyOperator(task_id='task2_2')

    # setting up dependencies.
    task2_1 >> task2_2

参考文献

https://qiita.com/munaita/items/5b6444880722df74ac61
https://engineer-boost.com/google-cloud/?p=3077
https://qiita.com/
______________-/items/8ff5b62d26f936557fb8

関連記事

Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ DAGが失敗後に別DAGをPauseするには ~
https://dk521123.hatenablog.com/entry/2026/01/24/003046




以上の内容はhttps://dk521123.hatenablog.com/entry/2026/01/23/222936より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14