ローカルで立てたAirflowで開発中のアプリDB→BigQueryのDAGがざっくり動いたので、今度は前回の更新日以降を差分同期したいと思った。
DBの種類がMySQLなのでちょっと違うけどやりたいのはまさにこれ。
前回実行時間と今回の実行時間があれば、SQLのWHERE句に入れればやりたいことできそう。
https://airflow.readthedocs.io/en/1.10.3post1/_modules/airflow/contrib/operators/mysql_to_gcs.html
ソースをみるとsqlがtemplate_fieldsに入っているのでSQLの中に{{ execution_date }}とやれば入りそうだけど、python側で条件分岐などを行いたいのと内部理解も含めてcontextを自分で使ってみる。
下記のようにwith文で処理しているのでdagに入っているのかと思ったら、それっぽいのは見当たらず。
with DAG(
'mysql2gcs2bq',
default_args=default_args,
description='admin tables to gcs to bq',
schedule_interval=timedelta(days=1),
) as dag:
t1 = DummyOperator(
task_id='start'
)
# ....
airflow.models.dag — Airflow Documentation
PythonOperatorはprovide_contextをTrueにすることでkwargsとして渡してくれるらしいのでそれでMySqlToGoogleCloudStorageOperatorを囲ってみることにした。
airflow.operators.python_operator — Airflow Documentation
まだデバッグ中のコードだけどこんな感じ。contextを受け取るためには**kwargsが必要。
def gcs2bqOp(name: str, **kwargs):
print('gcs2bqOp {} kwargs:{}'.format(name, kwargs))
いろんなパス情報は関数化してる。
printの結果を見てみる。整形するとこんなの。
execution_date、prev_execution_date_successが今回必要なものっぽいが、datetimeではなく見慣れぬPendulumという型で入っているようす。直訳すると振り子。
[2020-04-14 01:56:32,510] {{logging_mixin.py:112}} INFO - mysql2gcsOp rhistories:
{
'conf': <airflow.configuration.AirflowConfigParser object at 0x7ff6c4b9ef90>,
'dag': <DAG: mysql2gcs2bq>,
'ds': '2020-04-14',
'next_ds': '2020-04-14',
'next_ds_nodash': '20200414',
'prev_ds': '2020-04-14',
'prev_ds_nodash': '20200414',
'ds_nodash': '20200414',
'ts': '2020-04-14T01:56:20.076503+00:00',
'ts_nodash': '20200414T015620',
'ts_nodash_with_tz': '20200414T015620.076503+0000',
'yesterday_ds': '2020-04-13',
'yesterday_ds_nodash': '20200413',
'tomorrow_ds': '2020-04-15',
'tomorrow_ds_nodash': '20200415',
'END_DATE': '2020-04-14',
'end_date': '2020-04-14',
'dag_run': <DagRun mysql2gcs2bq @ 2020-04-14 01:56:20.076503+00:00: manual__2020-04-14T01:56:20.076503+00:00, externally triggered: True>,
'run_id': 'manual__2020-04-14T01:56:20.076503+00:00',
'execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
'prev_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
'prev_execution_date_success': <Proxy at 0x7ff6b10aa140 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b1030950>>,
'prev_start_date_success': <Proxy at 0x7ff6b1037190 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b106d9e0>>,
'next_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
'latest_date': '2020-04-14',
'macros': <module 'airflow.macros' from '/usr/local/lib/python3.7/site-packages/airflow/macros/__init__.py'>,
'params': {},
'tables': None,
'task': <Task(PythonOperator): py_rhistories_mysql_to_bq>,
'task_instance': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>,
'ti': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>,
'task_instance_key_str': 'mysql2gcs2bq__py_rhistories_mysql_to_bq__20200414',
'test_mode': False,
'var': {'value': None,
'json': None},
'inlets': [],
'outlets': [],
'templates_dict': None
}datetimeを便利にしたものらしい。公式サイトがPythonライブラリとは思えないほどおしゃれ。jsだとmoment.jsとかdayjsとか、PHPだとCarbonとか日付関係の便利ライブラリだいたいあるね。
pendulum.eustace.io
便利そう。
これをいい感じにSQLに入れたりすればできそう。
成功日時の無い初回だけはちょっとなにかする必要があるか。
追記:templateで十分だし、contextを使うのは難しかった
ここまでやったところで、PythonOperator内で更にMySqlToGoogleCloudStorageOperatorを使うのとそのままでは動かず、面倒そうだった。
更にどうやらsqlはAirflowによっていわゆるtemplatedされているため、変数だけでなくjinjaテンプレートでif文とかも使えるのだった。
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
{% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.fomart("%Y-%m-%d %H:%M:%S") }}"{% endif %}今回はprev_execution_date_successが入っているときのみWhere句を入れたいのでこれで十分だった。
あれPendulumじゃない・・・・
上記のを試してみるけど、SQLが空になってしまうので試行錯誤してたら下記のようなエラーが。
[2020-04-14 05:51:21,872] {{taskinstance.py:1145}} ERROR - 'datetime.datetime object' has no attribute 'format'え、Pendulumじゃなくて、datetime.datetime objectだ・・・。
contextとMySqlToGoogleCloudStorageOperatorでTemplateで使ってる変数が違うのだろうか。
あ、タイムゾーン・・・
とりあえずstrftimeにして動いたけど
```
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
{% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.strftime("%Y-%m-%d %H:%M:%S") }}"{% endif %}
```
実行ログをみると、当たり前だけどUTCのままの時間を使ってた
```[2020-04-14 05:57:34,313] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:47:45"```
ドキュメントを見てみるとensure_utcというそれっぽいオプションが。
airflow.contrib.operators.mysql_to_gcs — Airflow Documentation
Trueにして実行したところ、クエリの前にMySQL側のtime_zoneを変更してくれているよう。これなら大丈夫な予感。
```
[2020-04-14 06:08:15,877] {{mysql_to_gcs.py:90}} INFO - Executing: SET time_zone = '+00:00'
[2020-04-14 06:08:15,880] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:57:30"
```
- 作者:西田 圭介
- 発売日: 2017/09/22
- メディア: 単行本(ソフトカバー)