以下の内容はhttps://nealle-dev.hatenablog.com/entry/2024/12/17/02より取得しました。


TROCCO & BigQueryテーブルメタデータで手軽にデータ取り込みの正常性をチェックする

TROCCO & BigQueryテーブルメタデータで手軽にデータ取り込みの正常性をチェックする
本記事はニーリーアドベントカレンダー2024の17日目の記事 その2 です。

はじめに

こんにちは。Analyticsチームの上田です。今回は小ネタとして、Park Directのデータ基盤ではどのようにデータ取り込みの正常性をチェックしているか?をご紹介します。 手軽さ (仕組みの単純さ・メンテナンスのしやすさ) を重視した方法ですので、少人数 (1~3名) で小~中規模のデータ基盤を運用している方の参考になるかもしれません。

データ取り込みの正常性チェック

Park Directのデータ基盤では、TROCCO等を利用して各データソースのデータをBigQueryに日次で取り込んでいます。 データ取り込みの正常性チェックでは、主に日次のデータ取り込みが意図せず停止・遅延・転送漏れしていないか?をチェックしています *1

実現方法

1. TROCCOの通知機能を利用した方法

TROCCOでは、各テーブルの転送設定ごとに、Slack等への通知を設定することができます*2。弊チームの場合は、次の条件に該当する場合に、Slackへ通知を飛ばしています。

  • 転送ジョブが失敗した場合
  • 転送したレコードが0件以下の場合
  • 転送をスキップした行が1つ以上存在する場合
  • 転送時間がxx分を超えた場合 (テーブル毎に分数は調整)

2. BigQueryデータセットのメタデータを利用した方法

TROCCOでデータ取り込みを行ってないデータソースについては、前述の方法が利用できません。そのため、BigQueryデータセットのメタデータを利用した仕組みも用意しています。 ここで、BigQueryデータセットのメタデータとは、各データセットに存在する {データセット名}.__TABLES__ (View) を指します。このViewには次の列が存在します*3

列名 説明
project_id データセットが所属するGoogleCloudプロジェクトのID
dataset_id データセット名
table_id テーブル名
creation_time テーブルの作成時刻 (UNIX TIME)
last_modified_time テーブルの最終変更時刻 (UNIX TIME)
row_count テーブルの行数
size_bytes テーブルのデータサイズ
type テーブルの種類 (1: テーブル、View: 2)

弊チームの場合は、データ基盤 (BigQuery) 上の全てのテーブル (Viewは除く) について、上記Viewのレコードを日次で蓄積し、各テーブルの行数の増加量が閾値を超えた場合にSlackに通知を飛ばしています。 具体的な実現方法は次の通りです。

2.1. Viewのレコードの日次蓄積

蓄積には、次のようなCloud Run関数を利用しています。関数はCloud Schedulerで日次実行しています。また、蓄積先のデータセット・テーブルは、下記の関数外で事前に作成しています。

import base64
import functions_framework
from google.cloud import bigquery


@functions_framework.cloud_event
def update_tables_meta(cloud_event):
   """
   GoogleCloudプロジェクト内の全てのデータセット名のメタデータ (__TABLES__ View) を、指定したデータセット内のテーブルにINSERTする関数
   """

    # Step1. プロジェクトIDとBigQueryクライアントを設定
    project_id = "{対象のGoogleCloudプロジェクトのID}"
    client = bigquery.Client(project=project_id)

    # Step2. プロジェクト内の全てのデータセット情報を取得
    datasets = client.list_datasets()

    # Step3. 取得したデータセット情報を利用し、テーブルの __TABLES__ Viewのデータを一括で取得・蓄積用テーブルにINSERTするためのSQLを作成
    # SQLはCTE部分→SELECT部分→INSERT部分の順に作成する
    dataset_ids = [dataset.dataset_id for dataset in datasets]

    with_clause = "WITH raw_meta AS ("
    for dataset_id in dataset_ids:
        with_clause += f"SELECT * FROM `{project_id}.{dataset_id}.__TABLES__` UNION ALL "
    with_clause = with_clause.rstrip(" UNION ALL ") + ")" # 末尾の余分な " UNION ALL " を削除

    query = f"""
    {with_clause}
    SELECT
      CURRENT_DATE('Asia/Tokyo') AS insert_date
    , dataset_id
    , table_id
    , CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_MILLIS(creation_time), 'Asia/Tokyo') AS DATETIME) AS creation_time
    , CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_MILLIS(last_modified_time), 'Asia/Tokyo') AS DATETIME) AS last_modified_time
    , row_count
    , size_bytes
    FROM 
      raw_meta
    WHERE 
      type = 1
    """

    insert_query = f"""
    INSERT INTO `{project_id}.{Viewのデータの蓄積先データセット名}.{Viewのデータの蓄積先テーブル名}` (
      insert_date
    , dataset_id
    , table_id
    , creation_time
    , last_modified_time
    , row_count
    , size_bytes
    )
    {query}
    """

    # Step4. SQLを実行
    job = client.query(insert_query)
    job.result()  

この関数によって蓄積される、テーブルのメタデータは次のようになります。

insert_date*4 dataset_id table_id creation_time last_modified_time row_count size_bytes
2024-12-14 データセット名AAA テーブル名aaa 2024-12-14T01:08:34 2024-12-14T01:08:38 100025 30281289
2024-12-14 データセット名BBB テーブル名bbb 2024-12-14T01:09:02 2024-12-14T01:09:08 17011 1878181
2024-12-15 データセット名AAA テーブル名aaa 2024-12-15T01:08:27 2024-12-15T01:08:30 104312 30348911
2024-12-15 データセット名BBB テーブル名bbb 2024-12-15T01:08:50 2024-12-15T01:08:53 17262 1879255

2.2. 各テーブルの行数の増加率が閾値を超えた場合にSlackに通知

上記のテーブルメタデータを元に、データ取り込みの成否チェックを行っています。 弊チームの場合は、BIツールにRedashを利用していることから、Redashのアラート機能を利用し、 前日→当日の行数増加率が「直近1週間の最大行数増加率の10%pt上を上回る」または「最小行数増加率の10%pt下を下回る」テーブルをSQLで抽出し、Slackに通知しています。 抽出用のSQL (BigQuery) は次のようになります。

-- BigQuery
WITH

-- テーブルメタデータを格納するテーブルから、各テーブルの直近8日分のメタデータを集計
recent_data AS (
SELECT
  insert_date
, dataset_id
, table_id
, row_count
, LAG(row_count, 1) OVER (PARTITION BY dataset_id, table_id ORDER BY insert_date) AS prev_row_count -- 前日のレコード数
FROM
  `{対象のGoogleCloudプロジェクトのID}.{メタデータの蓄積先データセット名}.{メタデータの蓄積先テーブル名}`
WHERE
  insert_date > DATE_SUB(CURRENT_DATE(), INTERVAL 8 DAY) 
)

-- 各テーブルについて、前日~今日にかけての行数増加率を集計
, today_growth AS (
SELECT
  dataset_id
, table_id
, row_count
, row_count - prev_row_count as today_growth -- 前日との差分から今日の増減量を取得
, safe_divide((row_count - prev_row_count), prev_row_count) AS today_growth_rate -- 前日との差分から今日の行数増加率を取得
FROM
  recent_data
WHERE
  insert_date = CURRENT_DATE()
)

-- 各テーブルについて、直近1週間(当日除く) の最大行数増加率・最小行数増加率を集計
, min_max_data AS(
SELECT
  dataset_id
, table_id
, MIN(safe_divide((row_count - prev_row_count), prev_row_count)) AS min_growth_rate
, MAX(safe_divide((row_count - prev_row_count), prev_row_count)) AS max_growth_rate
FROM
  recent_data
WHERE 
  insert_date <> CURRENT_DATE()
GROUP BY
  dataset_id
, table_id
)

-- today_growth と min_max_data を結合し、「前日→当日の行数増加率が直近1週間の最大行数増加率の10%pt上を上回る、または最小行数増加率の10%pt下を下回るテーブルのみを異常 (通知対象) とする判定フラグ」を付与
, pre_output as (
SELECT
  mm.dataset_id as `データセット名`
, mm.table_id as `テーブル名`
, td.row_count as `レコード数`
, td.today_growth as `本日取込分>行数増加量`
, td.today_growth_rate as `本日取込分>行数増加率`
, mm.max_growth_rate as `直近7日間>最大行数増加率`
, mm.min_growth_rate as `直近7日間>最小行数増加率`
, CASE
    WHEN td.today_growth_rate > mm.max_growth_rate + 0.1 THEN TRUE
    WHEN td.today_growth_rate < mm.min_growth_rate - 0.1 THEN TRUE
    ELSE FALSE
  END AS `通知要否判定フラグ`
FROM
             min_max_data AS mm
  INNER JOIN today_growth AS td
          ON mm.dataset_id = td.dataset_id
             AND mm.table_id = td.table_id
)

-- 通知対象のテーブルに関するメタデータのみを抽出する (=Redashのアラートを発火させる)
SELECT
  `データセット名`
, `テーブル名`
, `レコード数`
, `本日取込分>行数増加量`
, `本日取込分>行数増加率`
, `直近7日間>最大行数増加率`
, `直近7日間>最小行数増加率`
FROM
  pre_output
WHERE
  `アラート通知判定用フラグ` IS TRUE 
  -- ▼通知対象から除外する条件があれば追加する
  -- 例: AND `テーブル名` <> 'xxx'
;

所感

今回ご紹介した方法による正常性チェックは、データ基盤の運用を開始した最初期 (2023年12月) から使用しています。 その間に感じた、良い点と課題は次の通りです。

▼TROCCOの通知機能を利用した方法

  • 良い点
    • TROCCOのWEBコンソール上操作だけ (=ノーコード) で全ての通知を設定できる
  • 課題
    • 転送設定 (≒テーブル) 毎に通知を設定する必要がある
      • 100テーブル転送している場合は、100個分の設定が必要

▼BigQueryデータセットのメタデータを利用した方法

  • 良い点
    • データセットやテーブルが増えてもコードをメンテナンスする必要がない
      • 使用開始から現在まで、Cloud Run関数のコードは一度もメンテナンスせずに済んだ
    • 万が一「TROCCOの通知機能を利用した方法」の通知設定の追加を失念した場合でも、こちらの方法でカバーできる
    • 通知条件をSQLで柔軟に調整できる
    • データ基盤上のデータ量の推移も同時に出せるので、BigQueryやTROCCOのコスト予測に活用できる
  • 課題
    • 重複行のチェックなど、レコードの値に基づくチェックはできない

おわりに

本記事では、Park Directデータ基盤における、データ取り込みの正常性チェックの方法についてご紹介しました。 アドカレ期間以外にも、このような小ネタは定期的にポストしていきたいと思います。 今後もよろしくお願いいたします!

*1:重複行や意図しない値の混入が発生していないか?などもチェックしていますが、それには別の方法を採用しています。理由は「所感」セクションをご覧ください。

*2:転送設定に関する便利な設定を行う - TROCCO HELP CENTER

*3:BigQueryの公式ドキュメントには __TABLES__ の解説が記載されていないため、列の説明は独自調査によるものです

*4:その行のメタデータの記録日




以上の内容はhttps://nealle-dev.hatenablog.com/entry/2024/12/17/02より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14