
LayerX BizOps 部データグループのさえない (@saeeeeru) です。最近は娘と『名探偵プリキュア!』にハマっています。「自分で見て、感じて、考えて、"本当"の答えを出す」。AI 時代だからこそ刺さるメッセージです(推理パートをちゃんと解けるようになりたい)。
前回の記事では、dbt Python model から外部 API を呼び出す実装パターンを紹介しました。今回はその応用として、LLM の Web Search 機能を使って公開情報を取得し、それをデータパイプラインに組み込む実践例を書きます。
この記事では、まず LLM の Web Search 機能をどう使うとデータパイプラインに載せやすい形になるのか を説明し、そのうえで Snowflake / dbt にどう載せたのか、そして本番運用の中でどんな品質課題が見えてきたのか、という順に整理します。
Web Search を Snowflake / dbt のパイプライン設計にどう載せるか
この記事の技術が必要になった背景には、分析基盤である Snowflake にない外部の公開情報を継続的に取り込みたい、というモチベーションがありました。たとえば、既存の属性情報だけでは判断材料が不足し、企業サイトやニュースリリースのような一次情報を補助的に参照したくなるケースがあります。
こうした情報を毎回人手で見に行く運用は継続しづらい一方で、自然言語のまま取得しても構造化データではないためデータ処理対象として扱いづらいです。この章では、外部 Web 上の公開情報をどう取得し、どうすれば Snowflake / dbt のパイプラインで扱える形にできるか を説明します。
外部 Web Search の実装パターン
外部の公開情報を取得する手段として、まず古典的なスクレイピングがあります。しかし、企業サイト・ニュースリリース・メディア記事など多様なソースの Web ページを対象にする場合、サイトごとにパーサーを書いて構造化するのは現実的ではありません。取得したい情報が「資金調達をしたか」「事業内容は何か」といった、自然言語の意味を解釈したうえでの抽出である以上、LLM を介する必要がありました。
LLM を使って外部の公開情報を取得・構造化する場合、実装パターンは大きく 2 つあります。
- 検索 API と LLM API を分けるパターン : 検索と要約・抽出を別々の API で組み合わせる
- Web Search を内包した LLM API を使うパターン : 検索と応答生成を 1 つの API でまとめて扱う
今回の設計では、検索から構造化までを LLM に任せて実装と運用をシンプルに保ちたかったため、後者の Web Search を内包した LLM API を採用しました。具体的には Azure OpenAI の Responses API + web_search_preview を使っています。
ただ、LLM の応答を自然文のまま返させるだけでは後続のデータ処理につなぎにくいため、Responses API の応答は JSON として返させるよう設計しました。Snowflake 上ではまず半構造化データとして保持し、必要な情報を後段で扱いやすい形に整えていきます。次の節では、「何を抽出し、どの粒度で持つか」という出力スキーマの設計を説明します。
スキーマ設計の重要性
スキーマの具体的な定義はユースケースによって異なりますが、重要なのは以下の 2 つの補助情報を含めることです。 Web Search を使っても誤りは残るため、重要なのは「誤りをなくすこと」よりも、「後から確認ができる情報を残すこと」だと考えています。
confidence:high/medium/lowの 3 段階で、モデル自身の確信度を自己申告させるevidence: 出典 URL と該当箇所のスニペットを配列で返させる
これにより、利用者が情報を鵜呑みにせず、「根拠を確認してから判断する」運用を組み込みやすくなります。confidence はあくまで自己申告であり、evidence も正しさそのものを保証するわけではありませんが、少なくとも確認を始める手がかりは残せます。
ここまでが、「Web Search をどう実装し、データパイプラインに載せるにはどんな出力スキーマが必要か」という話です。続いて、その処理を Snowflake / dbt の中でどう実装・運用したかを見ていきます。
なぜ dbt Python model で LLM API を呼び出す構成にしたのか
今回の本番構成では、dbt Python model から LLM API を呼び出す形を採用しました。
正直なところ、一番大きかったのは、Snowflake / dbt の既存パターンの延長として、データエンジニアが実装・運用しやすかったことがあります。
LayerX のデータ基盤では、各種 SaaS API を呼び出す dbt Python model の実装がすでに多数あります(前回の記事で紹介したパターン)。そのため、LLM API の呼び出しも同じパターンに載せるのが自然でした。
全体像は次の通りです。
graph TD
subgraph Snowflake
MASTER["マスターデータ"]
subgraph dbt["dbt レイヤー構成"]
PY["dbt Python model<br/>対象レコードの抽出 / API 呼び出し / JSON 格納"]
LND["landing<br/>取得直後の RAW_JSON"]
SRC["src<br/>不正データ除外 / 取得手段の隠蔽"]
DWH["dwh<br/>再利用を意識した再構成"]
MART["mart<br/>用途特化の利用データ"]
end
end
subgraph RESP["Responses API"]
API["Web Search<br/>+ JSON 応答"]
end
MASTER --> PY
PY -- "Snowflake から<br/>外部 API へ接続" --> API
API -- "JSON 応答" --> PY
PY --> LND
LND --> SRC --> DWH --> MART
MASTER -.-> DWH
MASTER -.-> MART
ここでいうデータのレイヤーは、社内では大きく landing、src、dwh、mart に分かれています。役割としては、landing は取得直後のデータを保持する層、src は取得手段を隠蔽しつつほぼ生データとして扱いやすくする層、dwh は再利用を意識して再構成する層、mart は用途特化で利用する層です。今回の記事で主に扱うのは、landing -> src までです。
このように、dbt に載せておくと、Snowflake 上の既存データを参照しながら、Responses API の呼び出し対象を絞り込めます。これはコストを抑えるのに効くため、後述する Incremental 戦略の前提になります。それに加えて、 Snowflake 上の他のユースケースにも展開しやすいという見通しもありました。
実装: dbt Python model × LLM API (Web Search)
dbt Python model の実装
この節では dbt Python model 側の実装を紹介しますが、Snowflake に寄った実装詳細になっています。
コードのポイントを先に整理しておきます。
- 並列実行: Responses API は 1 リクエストあたり 10〜30 秒かかるため、
ThreadPoolExecutor(max_workers=10)で並列化しています。10は Snowflake 上での実行負荷と API 側の待ち時間を見ながら調整した値です - Incremental:
dbt.is_incremental+leftantijoin で、直近 N 日以内に処理済みのレコードをスキップします。N の値は取得対象の情報の更新頻度を意識して設計します - リトライ: JSON パースエラーと API エラー(429 等)を Exponential backoff で共通処理しています
Note: 認証情報は Snowflake の Generic Secret + External Access Integration で管理しています。また、
dbt.ref()とdbt.is_incrementalは、どちらも dbt が組み込みで用意している参照・条件分岐のための機能です
# landing レイヤー : dbt Python model の疑似コード import json import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone import _snowflake import pandas as pd import requests from snowflake.snowpark import Session from snowflake.snowpark.functions import col # ユースケースに応じた JSON Schema を定義 # 構造化出力オプションに渡し、出力形式を一定範囲で制約する OUTPUT_SCHEMA = { "type": "json_schema", "json_schema": { "name": "web_search_result", "strict": True, "schema": { # ... ユースケースに応じたスキーマ定義 ... }, }, } SYSTEM_PROMPT = "ここにプロンプトを入れる" MAX_WORKERS = 10 MAX_RETRIES = 3 BACKOFF_BASE_SECONDS = 1 SKIP_DAYS = 30 def build_payload(input_text: str) -> dict: return { "input": input_text, "instructions": SYSTEM_PROMPT, "tools": [{"type": "web_search_preview"}], "text": {"format": OUTPUT_SCHEMA}, } def extract_output_text(response_json: dict): """ Responses API のレスポンスから JSON 文字列を取り出す。 実際のレスポンス構造は SDK / API バージョンに合わせて実装する。 """ for item in response_json.get("output", []): if item.get("type") != "message": continue for content in item.get("content", []): if content.get("text"): return content["text"] return None def call_llm_api(input_text: str, api_key: str, endpoint: str, deployment: str) -> dict: """1 件分の Web 検索 + 構造化出力を取得する。""" # 実際の URL 形式は利用中の API に合わせて組み立てる url = f"{endpoint}/openai/deployments/{deployment}/responses" headers = {"api-key": api_key, "Content-Type": "application/json"} payload = build_payload(input_text) for attempt in range(MAX_RETRIES): try: resp = requests.post(url, headers=headers, json=payload, timeout=60) resp.raise_for_status() output_text = extract_output_text(resp.json()) if not output_text: logging.warning( json.dumps( { "event_name": "web_search_no_message", "input_text": input_text, "attempt": attempt, } ) ) if attempt < MAX_RETRIES - 1: time.sleep(BACKOFF_BASE_SECONDS * (2**attempt)) continue return {"error": "no_message_in_response", "_input": input_text} parsed = json.loads(output_text) parsed["_input"] = input_text return parsed except json.JSONDecodeError as e: logging.warning( json.dumps( { "event_name": "web_search_json_parse_error", "input_text": input_text, "attempt": attempt, "error": str(e)[:500], } ) ) if attempt < MAX_RETRIES - 1: time.sleep(BACKOFF_BASE_SECONDS * (2**attempt)) continue return {"error": f"json_parse_error: {str(e)[:500]}", "_input": input_text} except Exception as e: logging.warning( json.dumps( { "event_name": "web_search_call_failed", "input_text": input_text, "attempt": attempt, "error": str(e)[:500], } ) ) if attempt < MAX_RETRIES - 1: time.sleep(BACKOFF_BASE_SECONDS * (2**attempt)) continue return {"error": str(e)[:500], "_input": input_text} return {"error": "max retries exceeded", "_input": input_text} def model(dbt, session: Session): dbt.config( materialized="incremental", incremental_strategy="append", packages=["requests"], external_access_integrations=["your_access_integration"], secrets={"api_key": "your_api_key_secret_name"}, ) # --- 対象レコードの抽出 --- source_df = ( dbt.ref("input_list") .select(col("key").alias("input_key")) .distinct() ) if dbt.is_incremental: existing_df = session.table(f"{dbt.this}").select("input_key", "enriched_at") recent_existing_df = existing_df.filter( f"enriched_at >= dateadd('day', -{SKIP_DAYS}, current_timestamp())" ) source_df = source_df.join( recent_existing_df.select("input_key"), on="input_key", how="left_anti", ) targets = source_df.to_pandas() if targets.empty: return session.create_dataframe( [], schema=["input_key", "raw_json", "enriched_at"], ) # --- API 呼び出し(並列実行) --- api_key = _snowflake.get_generic_secret_string("api_key") endpoint = "https://your-llm-endpoint.example.com" deployment = "your-deployment" results = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = { executor.submit(call_llm_api, row["INPUT_KEY"], api_key, endpoint, deployment): row["INPUT_KEY"] for _, row in targets.iterrows() } for future in as_completed(futures): input_key = futures[future] try: results.append(future.result()) except Exception as e: results.append({"error": str(e), "_input": input_key}) # --- 結果を DataFrame に変換 --- enriched_at = datetime.now(timezone.utc) rows = pd.DataFrame( [ { "input_key": r.get("_input", ""), "raw_json": json.dumps(r, ensure_ascii=False), "enriched_at": enriched_at, } for r in results ] ) return session.create_dataframe(rows)
landing → src: RAW_JSON の構造化
dbt Python model の出力は、landing レイヤーに RAW_JSON として Incremental append されます。ここではまず取得直後の半構造化データを保持し、下流で扱いやすくするために src レイヤーで PARSE_JSON を使って必要なキーを抽出します。つまり、landing で取得結果そのものを残し、src で不正データの除外や取得手段の隠蔽を行いながら、ほぼ生データとして扱える形に寄せる、という分担になっています。
-- src レイヤー: RAW_JSON からキーを抽出する疑似コード with import_landing as ( select input_key, raw_json, enriched_at from {{ ref('landing__azure_openai__web_search') }} ), logic_parse_json as ( select input_key, parse_json(raw_json) as raw_json, enriched_at from import_landing ), logic_parsed as ( select input_key, raw_json:confidence::varchar as confidence, raw_json:evidence::variant as evidence, -- ユースケース固有のキーをここで抽出 -- raw_json:your_key::varchar as your_column, enriched_at from logic_parse_json where raw_json:error is null ) select * from logic_parsed
こうすることで、下流の dwh レイヤーでは、通常の dbt SQL model として既存のテーブルと JOIN しながら、再利用を意識した形に再構成できます。用途特化の加工が必要であれば、その先の mart で受けることもできます。LLM パイプラインの出力を、他の外部ソース由来データと同じようにデータ基盤の標準レイヤーの中で扱いやすくする、というのがこの構成の狙いです。
PoC から本番運用で見えてきたこと
PoC の進め方と Go/No-Go 判断
LLM API の Web Search を組み込んだパイプラインは、プロンプトを書いただけでは品質が読めません。本番投入の前に、少数サンプルで「このユースケースに対して実用になるか」を判断する PoC フェーズを設けました。
検証プロセス
- 少数サンプルで実行: まず数件分だけ Snowflake 上で実行し、出力を目視確認します。JSON のパースが通るか、期待したキーにそれらしい値が入っているかを手で見るフェーズです
- 品質指標の設計: 目視確認で「いけそうだ」となったら、Go/No-Go を判断するための定量指標を定義します。今回は以下の 3 つを設定しました
- 情報取得成功率: API エラーや JSON パースエラーなく情報を返せるか
- 出典確認成功率: 出典 URL を実際に開いて、参照先が有効であり、出力内容と整合しているかを人手で確認
- コスト: 1 リクエストあたりの API コストが許容範囲に収まるか
- Go/No-Go 判断: 各指標に閾値を設け、全指標が閾値を満たせば本番移行。満たさなければプロンプト改善 or 設計見直し
PoC の結果はいずれの指標も良好で、本番運用に移行しました。
本番運用で顕在化した課題
PoC では良好だった品質も、本番運用で数百件を処理すると想定していなかったパターンが顕在化してきました。代表的だったのは、confidence=high かつ evidence 付きでも、実際には無効な URL を根拠として返すケースです。
これは、引用した根拠自体が有効でないという引用根拠の妥当性の問題で、プロンプトの改善(「推測で補完しないこと」「根拠が取れない場合は unknown を返すこと」等)である程度は抑制できますが、プロンプトだけでゼロにはできないというのが現時点の実感です。次のステップとして、Snowflake AI Observability(SnowflakeでAI Observabilityを実現する)を使った出力品質の定量評価や、Human-in-the-Loop によるフィードバック収集の仕組みを検討しています。
まとめ
dbt の中心的な役割は変換処理ですが、dbt Python model を使うと、外部データの取得から下流の整形までを同じデータパイプラインの中で扱いやすくなります。今回のパターンは、LLM API の Web Search で取得したデータを Snowflake に蓄積し、後続のモデルで使える形に整えていく用途に向いています。こうして公開情報が継続的にデータ基盤へ載るようになると、後続ユースケースでも毎回個別に外部情報を取りに行かずに済むようになり、判断材料のばらつきを減らすことにつながります。
今後は、本番運用で見えてきた品質課題に対して Snowflake AI Observability による定量評価の仕組みを整備し、出力品質が担保されたデータパイプラインへ進化させていく予定です。
AI-Ready なデータ基盤を、事業成果に直結する形で作る挑戦に興味を持った方は、Data Enablingチームとお話ししましょう。アナリティクスエンジニア、データエンジニアのカジュアル面談はこちらから 👇