はじめに
イタンジ株式会社のデータチームでマネージャーをしている山崎です。
私のチームで整備してきたデータ基盤は社内のKPIダッシュボードや問い合わせの調査対応で日々使われています。また、基盤側で作った成果物をプロダクトで活用したり、お客様向けのデータをさまざまな形で届けたりするための土台にもなっています。
今回は稼働中のデータベースからBigQueryへデータを集約する仕組みの全体像と、テーブルの規模や更新頻度に合わせて使い分けている3つの転送パターンを紹介します。
システム全体像
イタンジのデータ基盤はAirflowとBigQueryを中心にした構成です。 現在、プロダクトのバックエンドとして稼働しているデータベースからSQLAlchemyを使用してAirflowからデータを抽出し、BigQueryへ集約しています。

データ基盤の規模感
現在の規模感は以下の通りです。
| 項目 | 数値 |
|---|---|
| データ容量 | Active 250GB、LongTerm 920GB |
| 処理ジョブ数 | 約130 |
| 総テーブル数 | 約1,600 |
| BigQuery 日次スキャン量 | 1日平均 5TB |
| クエリ実行回数 | 1日平均 1万回 |
| 日次のデータ流入レコード数 | 1日平均 55億レコード1 |
イタンジのデータ基盤を支える3つのデータ転送パターン
BigQueryへの取り込み要件はテーブルごとに違います。私たちの場合はITANDI BB等のサービスで登録されている物件データやAWSのコスト、HubSpotのコールログなど多岐にわたるデータがあります。これらの同期頻度は月次で足りるものもあれば、15分に1回の同期が必要なものもあります。こうした違いに合わせて3つのパターンを使い分けています。
パターンA: 一括アップロード
テーブルサイズが比較的小さく、同期頻度も毎時程度で足りるものにはこのパターンを使っています。
処理手順
- SQLAlchemyを使い、リードレプリカからレコードを取得する
- ファイルを圧縮してGoogle Cloud Storageへアップロードする
- BigQueryからアップロードしたファイルをロードする
小さいテーブルでもコストは無視できません。運用では次の工夫を入れています。
- メモリ消費が跳ねないように、10,000件ごとに取得して処理を回す
- ローカルで生成したJSONLファイルをGZIPで圧縮してからGCSへ転送し、転送データ量を50〜70%減らす。(結果として処理時間も短くなる)
当初はこの方法でほとんどのテーブルを同期していました。ただ、運用を続けるうちに課題も出てきます。テーブル特性に合わせて、次のパターンも併用しています。
パターンB 分割アップロード 大規模テーブル向け
GB単位、あるいは億単位のレコードを持つ巨大テーブルを扱うためのパターンです。
パターンAのように単一ジョブでアップロードすると、BigQuery側の各種クォータやタイムアウトに引っかかるリスクが高まります。さらに、同期時間が長引き、求められる更新頻度を満たせないケースも出てきました。そこで、次の手順で並列処理しています。
処理手順
- ID範囲の計算: インデックスが貼られた主キーとなるIDカラム たとえば
idやuser_idの最小値と最大値から、パーティション範囲を算出する - 並列実行: Airflowの
TaskGroupを使い、複数のワーカーで各ID範囲のデータを個別にアップロードする - 統合: すべての一時テーブルが揃ったら、最後に1つの本番テーブルへ統合して入れ替える。上書きで運用する
パフォーマンス実績
一例ですが、実運用中の5GBで約500万レコードのテーブルでは一括処理で60分かかっていたところ、8分割の並列処理にして約15分まで短縮しました。
パターンC インクリメンタルアップロード 高頻度更新向け
物件情報など、リアルタイムに近い鮮度が求められるデータに対する差分更新パターンです。
差分更新のためにBigQueryで UPDATE や DELETE を多用すると、テーブル設計や更新範囲しだいで読み取り量が大きくなり、クエリコストが増えやすくなります。そこで、追記していくテーブルと重複を除くビューを組み合わせる方法を採っています。
- 差分抽出: 前回の実行で取得した最大ID、または
updated_at以降に更新されたレコードだけをDBから抽出する - 高頻度実行: 5分間隔などの短サイクルでDAGを回し、BigQueryへ
APPENDモードで追記し続ける - 重複排除: 利用者向けには、
ROW_NUMBER()のウィンドウ関数で最新の1件だけを返すビューを提供する
放置しているとレコードが溜まりクエリコストが高くなってしまいます。定期バッチで物理削除しテーブルをリフレッシュすることで回避しています。
Change Data Capture(CDC)の仕組みを利用しているため厳密には異なるのですが、Google Cloudが提供するDatastreamのAppend-only モードでも実現できます。ただ、Google CloudからAWS上のデータベースへ接続する動線はなるべく増やしたくなかったため、自前で用意した仕組みで回しています。
ただし、この方法を適用できるテーブルには以下の制約があるため注意が必要です。
- プライマリーキーやユニークキーを持つ
updated_atのような更新日時カラムがあり、レコード更新時に必ず更新される- レコードが物理削除されない
最後に
データベースからBigQueryへデータを取り込む方法はテーブルの規模や更新頻度で求められるものが変わります。AirflowならDAGとして処理を組み立てられるので差分の取り方や分割の仕方をPythonで素直に実装でき、運用しながら調整もしやすくなります。今回は触れませんでしたが同期のタイミングでポリシータグを適用し、機密情報に配慮したデータウェアハウスとして運用することができています。(別途記事にするかも)
生データを安定してBigQueryに集約する仕組みは整ってきました。将来的には独自性の高いイタンジの物件データを活用し、お客様に直接価値を提供できるプロダクト作りをしたいと思っていますがまだ道半ばです。 現在は活用しやすいデータ基盤に寄せていくための整備を進めていてdbtを導入中です。次回はその取り組みについて書ければと思います。
- データベースからの全件洗い替えを含む、1日に処理している総レコード数↩