
はじめに
こんにちは。Analyticsチームの上田です。今回は小ネタとして、Park Directのデータ基盤ではどのようにデータ取り込みの正常性をチェックしているか?をご紹介します。 手軽さ (仕組みの単純さ・メンテナンスのしやすさ) を重視した方法ですので、少人数 (1~3名) で小~中規模のデータ基盤を運用している方の参考になるかもしれません。
データ取り込みの正常性チェック
Park Directのデータ基盤では、TROCCO等を利用して各データソースのデータをBigQueryに日次で取り込んでいます。 データ取り込みの正常性チェックでは、主に日次のデータ取り込みが意図せず停止・遅延・転送漏れしていないか?をチェックしています *1。
実現方法
1. TROCCOの通知機能を利用した方法
TROCCOでは、各テーブルの転送設定ごとに、Slack等への通知を設定することができます*2。弊チームの場合は、次の条件に該当する場合に、Slackへ通知を飛ばしています。
- 転送ジョブが失敗した場合
- 転送したレコードが0件以下の場合
- 転送をスキップした行が1つ以上存在する場合
- 転送時間がxx分を超えた場合 (テーブル毎に分数は調整)
2. BigQueryデータセットのメタデータを利用した方法
TROCCOでデータ取り込みを行ってないデータソースについては、前述の方法が利用できません。そのため、BigQueryデータセットのメタデータを利用した仕組みも用意しています。 ここで、BigQueryデータセットのメタデータとは、各データセットに存在する {データセット名}.__TABLES__ (View) を指します。このViewには次の列が存在します*3。
| 列名 | 説明 |
|---|---|
| project_id | データセットが所属するGoogleCloudプロジェクトのID |
| dataset_id | データセット名 |
| table_id | テーブル名 |
| creation_time | テーブルの作成時刻 (UNIX TIME) |
| last_modified_time | テーブルの最終変更時刻 (UNIX TIME) |
| row_count | テーブルの行数 |
| size_bytes | テーブルのデータサイズ |
| type | テーブルの種類 (1: テーブル、View: 2) |
弊チームの場合は、データ基盤 (BigQuery) 上の全てのテーブル (Viewは除く) について、上記Viewのレコードを日次で蓄積し、各テーブルの行数の増加量が閾値を超えた場合にSlackに通知を飛ばしています。 具体的な実現方法は次の通りです。
2.1. Viewのレコードの日次蓄積
蓄積には、次のようなCloud Run関数を利用しています。関数はCloud Schedulerで日次実行しています。また、蓄積先のデータセット・テーブルは、下記の関数外で事前に作成しています。
import base64 import functions_framework from google.cloud import bigquery @functions_framework.cloud_event def update_tables_meta(cloud_event): """ GoogleCloudプロジェクト内の全てのデータセット名のメタデータ (__TABLES__ View) を、指定したデータセット内のテーブルにINSERTする関数 """ # Step1. プロジェクトIDとBigQueryクライアントを設定 project_id = "{対象のGoogleCloudプロジェクトのID}" client = bigquery.Client(project=project_id) # Step2. プロジェクト内の全てのデータセット情報を取得 datasets = client.list_datasets() # Step3. 取得したデータセット情報を利用し、テーブルの __TABLES__ Viewのデータを一括で取得・蓄積用テーブルにINSERTするためのSQLを作成 # SQLはCTE部分→SELECT部分→INSERT部分の順に作成する dataset_ids = [dataset.dataset_id for dataset in datasets] with_clause = "WITH raw_meta AS (" for dataset_id in dataset_ids: with_clause += f"SELECT * FROM `{project_id}.{dataset_id}.__TABLES__` UNION ALL " with_clause = with_clause.rstrip(" UNION ALL ") + ")" # 末尾の余分な " UNION ALL " を削除 query = f""" {with_clause} SELECT CURRENT_DATE('Asia/Tokyo') AS insert_date , dataset_id , table_id , CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_MILLIS(creation_time), 'Asia/Tokyo') AS DATETIME) AS creation_time , CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_MILLIS(last_modified_time), 'Asia/Tokyo') AS DATETIME) AS last_modified_time , row_count , size_bytes FROM raw_meta WHERE type = 1 """ insert_query = f""" INSERT INTO `{project_id}.{Viewのデータの蓄積先データセット名}.{Viewのデータの蓄積先テーブル名}` ( insert_date , dataset_id , table_id , creation_time , last_modified_time , row_count , size_bytes ) {query} """ # Step4. SQLを実行 job = client.query(insert_query) job.result()
この関数によって蓄積される、テーブルのメタデータは次のようになります。
| insert_date*4 | dataset_id | table_id | creation_time | last_modified_time | row_count | size_bytes |
|---|---|---|---|---|---|---|
| 2024-12-14 | データセット名AAA | テーブル名aaa | 2024-12-14T01:08:34 | 2024-12-14T01:08:38 | 100025 | 30281289 |
| 2024-12-14 | データセット名BBB | テーブル名bbb | 2024-12-14T01:09:02 | 2024-12-14T01:09:08 | 17011 | 1878181 |
| 2024-12-15 | データセット名AAA | テーブル名aaa | 2024-12-15T01:08:27 | 2024-12-15T01:08:30 | 104312 | 30348911 |
| 2024-12-15 | データセット名BBB | テーブル名bbb | 2024-12-15T01:08:50 | 2024-12-15T01:08:53 | 17262 | 1879255 |
2.2. 各テーブルの行数の増加率が閾値を超えた場合にSlackに通知
上記のテーブルメタデータを元に、データ取り込みの成否チェックを行っています。
弊チームの場合は、BIツールにRedashを利用していることから、Redashのアラート機能を利用し、
前日→当日の行数増加率が「直近1週間の最大行数増加率の10%pt上を上回る」または「最小行数増加率の10%pt下を下回る」テーブルをSQLで抽出し、Slackに通知しています。
抽出用のSQL (BigQuery) は次のようになります。
-- BigQuery WITH -- テーブルメタデータを格納するテーブルから、各テーブルの直近8日分のメタデータを集計 recent_data AS ( SELECT insert_date , dataset_id , table_id , row_count , LAG(row_count, 1) OVER (PARTITION BY dataset_id, table_id ORDER BY insert_date) AS prev_row_count -- 前日のレコード数 FROM `{対象のGoogleCloudプロジェクトのID}.{メタデータの蓄積先データセット名}.{メタデータの蓄積先テーブル名}` WHERE insert_date > DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY) ) -- 各テーブルについて、前日~今日にかけての行数増加率を集計 , today_growth AS ( SELECT dataset_id , table_id , row_count , row_count - prev_row_count as today_growth -- 前日との差分から今日の増減量を取得 , safe_divide((row_count - prev_row_count), prev_row_count) AS today_growth_rate -- 前日との差分から今日の行数増加率を取得 FROM recent_data WHERE insert_date = CURRENT_DATE() ) -- 各テーブルについて、直近1週間(当日除く) の最大行数増加率・最小行数増加率を集計 , min_max_data AS( SELECT dataset_id , table_id , MIN(safe_divide((row_count - prev_row_count), prev_row_count)) AS min_growth_rate , MAX(safe_divide((row_count - prev_row_count), prev_row_count)) AS max_growth_rate FROM recent_data WHERE insert_date <> CURRENT_DATE() GROUP BY dataset_id , table_id ) -- today_growth と min_max_data を結合し、「前日→当日の行数増加率が直近1週間の最大行数増加率の10%pt上を上回る、または最小行数増加率の10%pt下を下回るテーブルのみを異常 (通知対象) とする判定フラグ」を付与 , pre_output as ( SELECT mm.dataset_id as `データセット名` , mm.table_id as `テーブル名` , td.row_count as `レコード数` , td.today_growth as `本日取込分>行数増加量` , td.today_growth_rate as `本日取込分>行数増加率` , mm.max_growth_rate as `直近7日間>最大行数増加率` , mm.min_growth_rate as `直近7日間>最小行数増加率` , CASE WHEN td.today_growth_rate > mm.max_growth_rate + 0.1 THEN TRUE WHEN td.today_growth_rate < mm.min_growth_rate - 0.1 THEN TRUE ELSE FALSE END AS `通知要否判定フラグ` FROM min_max_data AS mm INNER JOIN today_growth AS td ON mm.dataset_id = td.dataset_id AND mm.table_id = td.table_id ) -- 通知対象のテーブルに関するメタデータのみを抽出する (=Redashのアラートを発火させる) SELECT `データセット名` , `テーブル名` , `レコード数` , `本日取込分>行数増加量` , `本日取込分>行数増加率` , `直近7日間>最大行数増加率` , `直近7日間>最小行数増加率` FROM pre_output WHERE `アラート通知判定用フラグ` IS TRUE -- ▼通知対象から除外する条件があれば追加する -- 例: AND `テーブル名` <> 'xxx' ;
所感
今回ご紹介した方法による正常性チェックは、データ基盤の運用を開始した最初期 (2023年12月) から使用しています。 その間に感じた、良い点と課題は次の通りです。
▼TROCCOの通知機能を利用した方法
- 良い点
- TROCCOのWEBコンソール上操作だけ (=ノーコード) で全ての通知を設定できる
- 課題
- 転送設定 (≒テーブル) 毎に通知を設定する必要がある
- 100テーブル転送している場合は、100個分の設定が必要
- 転送設定 (≒テーブル) 毎に通知を設定する必要がある
▼BigQueryデータセットのメタデータを利用した方法
- 良い点
- データセットやテーブルが増えてもコードをメンテナンスする必要がない
- 使用開始から現在まで、Cloud Run関数のコードは一度もメンテナンスせずに済んだ
- 万が一「TROCCOの通知機能を利用した方法」の通知設定の追加を失念した場合でも、こちらの方法でカバーできる
- 通知条件をSQLで柔軟に調整できる
- データ基盤上のデータ量の推移も同時に出せるので、BigQueryやTROCCOのコスト予測に活用できる
- データセットやテーブルが増えてもコードをメンテナンスする必要がない
- 課題
- 重複行のチェックなど、レコードの値に基づくチェックはできない
おわりに
本記事では、Park Directデータ基盤における、データ取り込みの正常性チェックの方法についてご紹介しました。 アドカレ期間以外にも、このような小ネタは定期的にポストしていきたいと思います。 今後もよろしくお願いいたします!
*1:重複行や意図しない値の混入が発生していないか?などもチェックしていますが、それには別の方法を採用しています。理由は「所感」セクションをご覧ください。
*2:転送設定に関する便利な設定を行う - TROCCO HELP CENTER
*3:BigQueryの公式ドキュメントには __TABLES__ の解説が記載されていないため、列の説明は独自調査によるものです
*4:その行のメタデータの記録日