Airflow の DAG ファイルのテスト方法について、調べた内容をまとめます。
環境
実行環境は、下記以前の投稿で作成したローカル上の Airflow 環境になります。
Apache Airflow をローカル環境にインストールする - goodbyegangsterのブログ
参考ドキュメント
参考にした資料たちです。
- Airflow 公式チュートリアル
- Airflow 公式 ベストプラクティス
- Astronomer Testing
- Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests
- Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests
example の DAG ファイルを無効化
Airflow 側で用意してくれている example や tutorial 用の DAG フォルダを無効化しておきます。これら DAG ファイル内の記述のせいで、煩わしい警告メッセージが大量に出力されてしまいます。
airflow.cfg
# Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment load_examples = False
config reference load_examples
テスト対象の DAG ファイル
利用できる Airflow CLI Commnad たち
CLI Command で利用できそうなものを確認。
DAG の正常登録を確認
airflow dags show コマンドにて、DAG が正しく登録されているかどうかを確認できます。
$ airflow dags show sample_test_dag
[2022-08-04 20:05:48,219] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
digraph sample_test_dag {
graph [label=sample_test_dag labelloc=t rankdir=LR]
get_version [color="#000000" fillcolor="#ffefeb" label=get_version shape=rectangle style="filled,rounded"]
print_version [color="#000000" fillcolor="#ffefeb" label=print_version shape=rectangle style="filled,rounded"]
get_version -> print_version
}
airflow tasks list コマンドでも確認可能です。
$ airflow tasks list -t sample_test_dag
<Task(_PythonDecoratedOperator): get_version>
<Task(_PythonDecoratedOperator): print_version>
DAG および Task の動作テスト
実際に DAG や Task を動かして、処理結果を確認することができます。
airflow dags test コマンドでは、指定した DAG を実行することができます。実行した場合、メタデータベース上にも JOB 実行の実績が記録されます。
$ airflow dags test sample_test_dag 2022-08-04
[2022-08-04 20:09:36,434] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-04 20:09:36,673] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [queued]>']
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-04 20:09:41,763] {backfill_job.py:378} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2022-08-04 20:09:41,784] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.print_version backfill__2022-08-04T00:00:00+00:00 [queued]>']
receive: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-04 20:09:46,652] {dagrun.py:562} INFO - Marking run <DagRun sample_test_dag @ 2022-08-04T00:00:00+00:00: backfill__2022-08-04T00:00:00+00:00, externally triggered: False> successful
[2022-08-04 20:09:46,653] {dagrun.py:622} INFO - DagRun Finished: dag_id=sample_test_dag, execution_date=2022-08-04T00:00:00+00:00, run_id=backfill__2022-08-04T00:00:00+00:00, run_start_date=2022-08-04 11:09:36.603766+00:00, run_end_date=2022-08-04 11:09:46.653258+00:00, run_duration=10.049492, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-08-04T00:00:00+00:00, data_interval_end=2022-08-05T00:00:00+00:00, dag_hash=None
[2022-08-04 20:09:46,653] {backfill_job.py:378} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-08-04 20:09:46,660] {backfill_job.py:879} INFO - Backfill done. Exiting.
airflow tasks test コマンドでは、Task 単位で実行することができます。Task 単位で実行した場合、メタデータベース上には JOB 実行の実績が記録されません。
$ airflow tasks test sample_test_dag get_version 2022-08-04
airflow tasks test sample_test_dag get_version 2022-08-04
[2022-08-05 01:15:43,066] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,124] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:15:43,141] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:15:43,142] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:15:43,142] {taskinstance.py:1357} INFO - Starting attempt 1 of 2
[2022-08-05 01:15:43,142] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:15:43,143] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00
[2022-08-05 01:15:43,185] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample_test_dag
AIRFLOW_CTX_TASK_ID=get_version
AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:15:43,186] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:15:43,198] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161543
Dependency に関する警告が出力されています。下記の GitHub によると、Xcoms を利用しているために起こる問題らしいです。ただ、TaskFlow API Task で書けば警告でないよみたいなコメントもあるのですが、僕の環境では警告メッセージが出力されていました。。。
"already registered for DAG" when dynamically generate tasks
taskmixin.py で除外して出力すると、きれいに表示してくれます。
$ airflow tasks test sample_test_dag get_version 2022-08-04 | grep -v "taskmixin.py"
[2022-08-05 01:17:44,607] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-05 01:17:44,652] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:17:44,658] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:17:44,658] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:17:44,658] {taskinstance.py:1357} INFO - Starting attempt 1 of 2
[2022-08-05 01:17:44,658] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:17:44,659] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00
[2022-08-05 01:17:44,695] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample_test_dag
AIRFLOW_CTX_TASK_ID=get_version
AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:17:44,696] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:17:44,710] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161744
「メタデータベース上に実績が記録されないのであれば、XComs を利用するような処理はテストできないのでは?」と思ったのですが、問題なく実行できました。下記の Stackoverflow によると、JOB 実行の履歴が残らないだけで、XComs にはアクセスがされているようです。
How to test Apache Airflow tasks that uses XCom
DAG Definition Tests
Python の unittest や pytest を利用して、宣言した DAG の定義情報を Validation する方法です。
テストコード
tests/test.py
実行
$ pytest -s tests/test.py
=============================================================================== test session starts ===============================================================================
platform linux -- Python 3.7.13, pytest-7.1.2, pluggy-1.0.0
rootdir: /home/mikochi/work/airflow
plugins: anyio-3.6.1
collected 1 item
tests/test.py [2022-08-05 01:51:31,400] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
<class 'airflow.models.dag.DAG'>
{'_access_control': None,
'_dag_id': 'sample_test_dag',
'_default_view': 'grid',
'_description': 'A sample test DAG',
'_max_active_tasks': 16,
'_pickle_id': None,
'_task_group': <airflow.utils.task_group.TaskGroup object at 0x7f0a5da5b610>,
'catchup': False,
'dagrun_timeout': None,
'default_args': {'retries': 1, 'retry_delay': datetime.timedelta(seconds=300)},
'doc_md': 'This is a sample test DAG.',
'edge_info': {},
'end_date': DateTime(2022, 12, 31, 0, 0, 0, tzinfo=Timezone('UTC')),
'fileloc': '/home/mikochi/work/airflow/dags/sample_test.py',
'has_on_failure_callback': False,
'has_on_success_callback': False,
'is_paused_upon_creation': None,
'jinja_environment_kwargs': None,
'last_loaded': datetime.datetime(2022, 8, 4, 16, 51, 31, 479758, tzinfo=Timezone('UTC')),
'max_active_runs': 16,
'on_failure_callback': None,
'on_success_callback': None,
'orientation': 'LR',
'params': <airflow.models.param.ParamsDict object at 0x7f0a5e862490>,
'partial': False,
'render_template_as_native_obj': False,
'safe_dag_id': 'sample_test_dag',
'schedule_interval': datetime.timedelta(days=1),
'sla_miss_callback': None,
'start_date': DateTime(2022, 8, 1, 0, 0, 0, tzinfo=Timezone('UTC')),
'tags': ['sample'],
'task_count': 2,
'task_dict': {'get_version': <Task(_PythonDecoratedOperator): get_version>,
'print_version': <Task(_PythonDecoratedOperator): print_version>},
'template_searchpath': None,
'template_undefined': <class 'jinja2.runtime.StrictUndefined'>,
'timetable': <airflow.timetables.interval.DeltaDataIntervalTimetable object at 0x7f0a5da5b2d0>,
'timezone': Timezone('UTC'),
'user_defined_filters': None,
'user_defined_macros': None}
tasks: ['get_version', 'print_version']
.
DagBag とは
airflow.models.dagbag とは、以下とのこと。
A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks.
dagbag オブジェクトに dag_id を指定して get_dag() メソッドを実行すると、該当 dag_id の DAG オブジェクトを取得でき、登録された情報を参照できます。その値を確認して、設定忘れがないか(catchup が無効であるかとか)を確認することができます。
Unit Test / End-to-End Pipeline Tests (Data Integrity Testing)
PythonOperator は、それ自体が Python の関数となるため、通常の Python プログラムのように単体テストのコードを書けば良いわけですが、外部のデータベース上データを操作する場合にどうするのか、適当なものがないか調べてみたのですが、やっぱり大変ですね。これはデータパイプライン全体のテストを実行する場合と同じですが、結局テストデータを用意して愚直に実施するしかないのかな、と。
以下のブログでは、その方法が紹介されていましたが、結局以下のような手順でした。
- テストデータを用意
- Airflow が提供する REST API を叩いて DAG を実行
- DAG の終了を待って、データ結果を確認
Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests
近道はない、と。
Airflow API はこちら。