以下の内容はhttps://techtekt.persol-career.co.jp/entry/tech/260331_02より取得しました。


Databricks のクエリ履歴ログを整理して、長期で振り返れる仕組みを作った話

自己紹介

こんにちは。データ・AIソリューション本部でデータエンジニアをしている宮下です。 これまで Databricks を使ったデータ基盤の構築・運用を担当しており、今回はクエリ履歴ログの長期蓄積と、テーブル参照状況の可視化について得た知見を共有します。


はじめに

この仕組みを作ったことで、以下のことができるようになりました。

  • 誰がいつどのクエリを実行したかを長期にわたって追跡できる
  • どのテーブルが誰にどれだけ参照されているかを日次で把握できる
  • テーブルの廃止・リネーム前に影響範囲をすぐ特定できる
  • 定期的な棚卸の際に過去のクエリ操作履歴を振り返れる

社内では Databricks SQL Warehouse をデータ分析基盤として運用しており、Read Only の SQL 実行基盤として一般利用者に提供しています。

Databricks には system.query.history というシステムテーブルがあり、SQL Warehouse 上で実行されたクエリの履歴(実行ユーザー・実行時刻・クエリ内容など)を参照できます。しかし実装に着手した 2024年11月時点では保持期間が約3ヶ月(Public Preview 仕様)しかなく、定期的な棚卸の際に過去の履歴を振り返ることができないと判断しました。そこで自前で蓄積する仕組みを構築することにしました。

補足: 本記事執筆時点(2025年12月)に確認したところ、system.query.history には約1年分のデータが保持されていました。仕様が改善されている可能性はありますが、長期間の振り返りには依然として自前での蓄積が有効です。

注記: 本記事の内容は執筆時点(2025年12月)の仕様に基づいています。system.query.history は Public Preview のため、保持期間や仕様が変更される可能性があります。最新情報は Databricks 公式ドキュメント をご確認ください。

本記事では、同様の課題を抱えている Databricks ユーザーの参考になればと思い、クエリ履歴の永続化からテーブル参照状況の日次記録までを実現した仕組みを紹介します。

プライバシーに関する注記: 本仕組みで収集するクエリ履歴・テーブル参照情報は、システム管理およびログ整理を目的としており、社内の情報セキュリティポリシーおよび利用規約に基づき運用しています。

対象読者: Databricks を運用しており、クエリ履歴ログの長期保存や棚卸時の振り返り基盤を検討している方


system.query.history とは

Databricks のシステムテーブルの一つで、SQL Warehouse 上で実行されたクエリの詳細情報を参照できます。 利用には 管理者権限またはビューの付与 が必要です。

⚠️ 監査ログ(system.access.audit)とは別物です Databricks には system.access.audit という監査ログ専用のシステムテーブルも存在します。 こちらはログイン・権限変更・オブジェクト操作などアカウント全体のアクティビティを記録するものです(保持期間はデフォルト1年)。 本記事で扱う system.query.historySQL Warehouse で実行されたクエリの実行内容・パフォーマンス情報に特化したテーブルであり、用途・内容ともに異なります。 参考:監査ログ システム テーブル リファレンス - Azure Databricks | Microsoft Learn

取得できる主な情報は以下のとおりです。

カテゴリ 主なカラム
Who(誰が) executed_by, executed_by_user_id
When(いつ) start_time, end_time, update_time(すべて UTC)
Where(どこで) compute(warehouse/serverless), workspace_id, account_id
What(何を) statement_text, statement_type, execution_status
PerfMetrics(性能) total_duration_ms, execution_duration_ms, read_rows, read_bytes など
ClientInfo client_application, client_driver
QuerySource query_source(job, notebook, alert_id など実行元情報)

導入の背景

以前は Databricks の管理コンソールや system.query.history を直接参照してクエリ履歴を確認していましたが、以下のような課題がありました。

  • 3ヶ月以前の履歴が参照できない:定期的な棚卸の際に過去のクエリ操作を振り返ることができない
  • テーブル単位の参照状況が見えない:どのテーブルが誰にどれだけ使われているか把握できず、廃止・リネームの影響範囲が不明
  • アクセス終了後のデータ反映タイムラグ:リアルタイムではなく当日分の確定データが必要

これらを解決するために、自前でクエリ履歴を蓄積する仕組みを構築することにしました。

技術選定の理由

今回の実装では以下の技術・ライブラリを採用しました。

技術 選定理由
PySpark(mapInPandas) Spark 分散処理を維持しながら Python ライブラリを使いたいため。Pandas UDF より柔軟なスキーマ変換が可能
sqlglot Python 製の SQL パーサーで Databricks SQL 方言に対応。正規表現によるテーブル名抽出と比較して、CTE・サブクエリの扱いが正確
Databricks Jobs 既存のワークフロー基盤をそのまま活用でき、追加のインフラが不要

設計上の意思決定

実装にあたり、いくつかの設計判断を行いました。

検討事項 採用した方針 理由
ロード方式 append(将来的に MERGE へ移行予定) 再実行時の重複防止が最終目標
タイムゾーン変換 UTC → JST に変換してから日付抽出 update_time が UTC のため、JST 基準の current_date() と比較するには変換が必須
スキーマ変更への対応 mergeSchema=true Public Preview テーブルのため、将来的なカラム追加に備える
Pandas UDF 不使用 性能・安定性の観点から採用しない

Step 1:クエリ履歴の収集・蓄積

コード全体

# ======================================================
# Purpose   : system.query.history からクエリ実行履歴を収集し、
#             長期蓄積・棚卸振り返り用テーブルとして保存する
#
# Source    : system.query.history
# Target    : <スキーマ名>.<テーブル名>(蓄積先テーブル)
# LoadType  : append(UPSERT は将来対応)
# Created   : 2025-12-16
# ======================================================

from pyspark.sql.functions import (
    to_date,
    current_date,
    from_utc_timestamp
)

# 元データ
query_history_df = spark.read.table("system.query.history")

# JST基準で本日分を抽出
today_df = (
    query_history_df
    .filter(
        to_date(
            from_utc_timestamp("update_time", "Asia/Tokyo")
        ) == current_date()
    )
)

# 本番投入
# ⚠️ 注意:insertInto() はテーブルが事前に存在している必要があります。
# 初回実行前に以下のような DDL でテーブルを作成しておいてください。
#
# CREATE TABLE IF NOT EXISTS <スキーマ名>.<テーブル名>
# USING DELTA
# AS SELECT * FROM system.query.history WHERE 1=0;
today_df \
    .write.mode("append") \
    .option("mergeSchema", "true") \
    .insertInto("<スキーマ名>.<テーブル名>")

ポイント解説

UTC → JST 変換が必要な理由

system.query.history の時刻カラム(update_time など)は すべて UTC で記録されています。 一方、Databricks の current_date() は Job の実行環境タイムゾーン(今回は JST)を返します。

変換なしで比較すると、UTC と JST の日付ズレにより 当日分のデータが一部抜ける 問題が発生します。

# NG: update_time が UTC のまま JST の current_date() と比較してしまう
.filter(to_date("update_time") == current_date())

# OK: JST に変換してから比較する
.filter(
    to_date(from_utc_timestamp("update_time", "Asia/Tokyo")) == current_date()
)

mergeSchema=true を指定する理由

system.query.history は Public Preview のテーブルです。 将来的にカラムが追加された場合でも、スキーマエラーで書き込みが止まらないよう mergeSchema=true を指定しています。


運用:Databricks Jobs による自動化

なぜ 22:30 に実行するか

社内では一般利用者の SQL Warehouse へのアクセスを 22時まで に制限しています。

22時ちょうどに実行すると、処理中のクエリが system.query.history にまだ反映されていない可能性があります。 そのため、30分のバッファを設けた 22:30 に実行 することで、当日分を取りこぼしなく収集できるようにしています。

Cron 式

Databricks Jobs のスケジュール設定(タイムゾーン:Asia/Tokyo)に以下の Cron 式を使用しています。

0 30 22 * * ?
フィールド 意味
0 0秒
30 30分
22 22時
* 毎日
* 毎月
曜日 ? 指定なし

注意:Databricks Jobs のスケジュール画面では、タイムゾーンを Asia/Tokyo に設定することで、UTC との混同を防げます。

Job 設定のポイント

設定項目 推奨内容
スケジュール 0 30 22 * * ?(Asia/Tokyo)
クラスター Job Cluster(SQL Warehouse は使用しない)
リトライ 1〜2回(システムテーブルの一時的な遅延に備える)

Step 2:SELECT クエリのテーブル参照状況を解析・記録

Step 1 で蓄積したクエリ履歴をもとに、SELECT 文を解析して「誰がいつどのテーブルを参照したか」を日次テーブルに記録します。

出力テーブルの構成

カラム名 内容
usage_date DATE 参照日(JST)
user_name STRING 実行ユーザー(executed_by
table_name STRING 参照テーブル名(スキーマ.テーブル 形式)

コード全体

# ============================================================
# query_history_user_table_usage_summary_daily(本番・全期間)
#
# Purpose:
#   SELECT クエリにおける
#   「誰が・いつ・どのテーブルを参照したか」を日次で記録
# ============================================================

from sqlglot import parse, exp
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.sql.functions import (
    col,
    to_date,
    from_utc_timestamp,
    current_timestamp
)

SRC_TABLE = "<カタログ名>.<スキーマ名>.<テーブル名>"
TGT_TABLE = "<カタログ名>.<スキーマ名>.<テーブル名(参照状況)>"

# SQL → 参照テーブル抽出
def extract_tables(sql: str):
    if not sql:
        return []
    try:
        tables = []
        for ast in parse(sql):
            for t in ast.find_all(exp.Table):
                if t.db:
                    tables.append(f"{t.db}.{t.name}")
                else:
                    tables.append(t.name)
        return tables
    except Exception:
        return []

# mapInPandas で分散解析
def parse_partition(pdf_iter):
    for pdf in pdf_iter:
        records = []
        for _, r in pdf.iterrows():
            # 同一SQL内の同一テーブルは1回のみ
            tables = set(extract_tables(r["statement_text"]))
            for t in tables:
                records.append({
                    "usage_date": r["usage_date"],
                    "user_name": r["user_name"],
                    "table_name": t,
                })
        if records:
            yield pd.DataFrame(records)

schema = StructType([
    StructField("usage_date", DateType(), True),
    StructField("user_name", StringType(), True),
    StructField("table_name", StringType(), True),
])

# 対象SQL抽出(当日分・JST)
base_df = (
    spark.table(SRC_TABLE)
    .withColumn(
        "usage_date",
        to_date(from_utc_timestamp("update_time", "Asia/Tokyo"))
    )
    .filter(col("statement_type") == "SELECT")
    .filter(
        col("usage_date")
        == to_date(from_utc_timestamp(current_timestamp(), "Asia/Tokyo"))
    )
    .select(
        "usage_date",
        col("executed_by").alias("user_name"),
        "statement_text"
    )
)

# 分散解析
parsed_df = base_df.mapInPandas(parse_partition, schema)

daily_usage_df = (
    parsed_df
    .filter(col("table_name").contains("."))  # temp / CTE 排除
    .select("usage_date", "user_name", "table_name")
    .dropDuplicates(["usage_date", "user_name", "table_name"])
)

# 書き込み(append)
# ⚠️ 注意:insertInto() はテーブルが事前に存在している必要があります。
# 初回実行前に以下のような DDL でテーブルを作成しておいてください。
#
# CREATE TABLE IF NOT EXISTS <スキーマ名>.<テーブル名(参照状況)> (
#   usage_date  DATE,
#   user_name   STRING,
#   table_name  STRING
# ) USING DELTA;
(
    daily_usage_df
    .write
    .mode("append")
    .option("mergeSchema", "true")
    .insertInto(TGT_TABLE)
)

ポイント解説

sqlglot による SQL 解析

クエリ本文(statement_text)から参照テーブルを抽出するために sqlglot を使用しています。 sqlglot は Python 製の SQL パーサーで、さまざまな SQL 方言に対応しています。

from sqlglot import parse, exp

def extract_tables(sql: str):
    for ast in parse(sql):
        for t in ast.find_all(exp.Table):
            if t.db:
                tables.append(f"{t.db}.{t.name}")  # スキーマ.テーブル
            else:
                tables.append(t.name)

t.db が存在する場合は スキーマ.テーブル 形式で記録し、テーブルの所在を明確にしています。

mapInPandas で分散処理

SQL 解析は Python ライブラリ(sqlglot)を使うため、Spark ネイティブの処理では実行できません。 mapInPandas を使うことで、各パーティションを Pandas DataFrame として受け取り、Python 処理を分散実行 できます。

Pandas UDF(@pandas_udf)と異なり、mapInPandas は入出力のスキーマを柔軟に変換できるため、 1行のクエリから複数行(複数テーブル)を展開するこのケースに適しています。

# 1クエリ → 複数テーブルへの展開も自然に記述できる
tables = set(extract_tables(r["statement_text"]))
for t in tables:
    records.append({ ... })

temp テーブル・CTE の除外

SQL 解析の結果には、WITH 句(CTE)や一時テーブル名も含まれる場合があります。 実テーブルは スキーマ.テーブル 形式で記録されているため、. を含まない名前をフィルタリングで除外しています。

.filter(col("table_name").contains("."))  # スキーマ名なしは temp / CTE とみなし除外

重複排除の考え方

同一 SQL 内で同じテーブルを複数回参照しても、1日1ユーザー1テーブルを1レコードとして記録します。

# extract_tables 内:同一SQL内の重複を除去
tables = set(extract_tables(r["statement_text"]))

# DataFrame レベル:同日・同ユーザー・同テーブルの重複を除去
.dropDuplicates(["usage_date", "user_name", "table_name"])

全体のデータフロー

system.query.history(Databricks システムテーブル・保持期間 約3ヶ月)
         │
         │ 毎日 22:30 JST(Databricks Jobs)
         ▼
<スキーマ名>.<テーブル名>(Step 1:全履歴を長期蓄積)
         │
         │ 同 Job の後続タスク
         ▼
<スキーマ名>.<テーブル名(参照状況)>(Step 2:テーブル参照状況)

蓄積後の活用イメージ

Step 1(<スキーマ名>.<テーブル名>)から:

  • 実行ユーザー別のクエリ量集計executed_by でグループ化)
  • 重いクエリの特定total_duration_ms の降順ソート)
  • Warehouse の稼働状況把握start_time / end_time の時系列分析)
  • クライアント・アプリケーション別のクエリ実行状況client_application の集計)

Step 2(<スキーマ名>.<テーブル名(参照状況)>)から:

  • テーブルごとの参照頻度ランキング(どのテーブルが最もよく使われているか)
  • ユーザーごとの参照テーブル一覧(誰が何を見ているか)
  • テーブルの参照者特定(廃止・リネーム前の影響範囲調査)
  • アクセスパターンの可視化(日次トレンドの把握)

実際に使ってみて

Step 1 のクエリ履歴蓄積は比較的シンプルに実装できましたが、Step 2 のテーブル参照解析は想定より考慮点が多くありました。

mapInPandas の採用により、sqlglot を使った Python 処理を Spark の分散実行に乗せられた点は想定どおりうまく機能しました。 一方で、CTE やサブクエリの扱いは sqlglot が適切にパースしてくれるため、正規表現アプローチと比べて精度が高く、実運用に耐える結果となっています。

また、蓄積したデータをもとに「特定テーブルの参照者一覧」をすぐに出せるようになり、テーブルの廃止・リネーム検討時の影響範囲調査が格段に楽になりました。

ハマりどころと解決策

① UTC/JST の日付ズレ

最初は to_date("update_time") == current_date() でフィルタリングしていたところ、深夜帯(JST 0〜9時)のクエリが翌日扱いになり、当日分が抜けるケースが発生しました。 from_utc_timestamp("update_time", "Asia/Tokyo") で変換してから比較することで解消しています。

② CTE・一時テーブルが参照テーブルとして混入

sqlglot で解析すると WITH 句で定義した CTE 名もテーブルとして検出されます。 実テーブルは必ず スキーマ.テーブル 形式で記録される(. を含む)ため、.filter(col("table_name").contains(".")) でシンプルに除外できました。

③ 同一テーブルへの複数参照による重複

1つの SQL 内で同じテーブルを JOIN と WHERE の両方で参照するケースなどで重複が発生しました。 set() による SQL 内重複排除と dropDuplicates による DataFrame レベルの重複排除を組み合わせて対処しています。


まとめ

system.query.history の保持期間が約3ヶ月しかなく、棚卸の際に過去のログを振り返ることができないという課題を解決するため、Databricks Jobs を使った2段階の日次処理を構築しました。

この仕組みを導入したことで、以下が実現できています。

  • 蓄積したクエリ履歴ログをいつでも振り返れるようになった
  • テーブルごとの参照者・参照頻度が可視化され、テーブル廃止時の影響調査が容易になった
  • 日次バッチが安定稼働しており、運用コストはほぼゼロ

Databricks でクエリ履歴ログの整理・長期保存を検討しているチームには、ぜひ参考にしていただければと思います。 特に sqlglot + mapInPandas の組み合わせは、SQL テキストを Spark 上で分散解析する際の有力なアプローチです。

今後は MERGE(UPSERT)への移行と、蓄積データを使ったダッシュボード整備を進めていく予定です。


参考

 


*

宮下 竜哉(Tatsuya Miyashita)

データ・AIソリューション本部 データ・AIインフラ統括部 データインフラ部 プラットフォームグループ

前職ではデータ分析および機械学習モデルの開発に従事。2023年1月にパーソルキャリア株式会社へ入社後、社内データ分析基盤の運用・開発を担当。

※2026年3月現在の情報です。




以上の内容はhttps://techtekt.persol-career.co.jp/entry/tech/260331_02より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14