
はじめに
こんにちは、商品基盤部の杉浦、小原、寺嶋です。普段はZOZOTOWNのお気に入り基盤・商品レビュー基盤といった商品サブドメインを担当しています。
私たちのチームでは運用コスト削減を目的として、お気に入りデータベースをオンプレミスのSQL ServerからAWS Aurora MySQLへの移行に取り組んでいます。お気に入りデータは数十億レコードに及び、移行中もデータが増え続けるためデータの静止点が作れないという課題がありました。本記事では、この大規模データ移行における初期移行の取り組みと、Embulkを用いた差分同期について紹介します。
なお、新規データの書き込みを担保するダブルライト戦略については前回の記事で紹介しています。あわせてご覧ください。
目次
お気に入りリプレイスの概要
ZOZOTOWNのお気に入り機能は、会員が興味のある商品・ブランド・ショップを登録し、お気に入り一覧から確認できる機能です。まず、ユーザー種別として会員とゲスト会員の2種類が存在し、それぞれ独立したテーブルで管理されています。お気に入り登録の対象も商品・ブランド・ショップの3種類があり、ユーザー種別との掛け合わせにより、合計6パターンのテーブルが移行対象となります。さらに、過去に削除されたお気に入りの履歴(アーカイブデータ) も保持されており、これらを含めると移行対象のテーブルは多岐に及びます。テーブルによってレコード数は数千万レコードから数十億レコードまで幅があり、合計すると数十億レコード規模のデータ移行となりました。
この構成は長年にわたりZOZOTOWNを支えてきましたが、以下のような課題を抱えていました。
技術スタックの老朽化
ZOZOTOWNは2004年の開始当初からClassic ASP(VBScript)とSQL Serverのストアドプロシージャでビジネスロジックを実装してきました。しかし、VBScriptは開発元のMicrosoftも積極的に開発しておらず、クラウドベンダーのSDKが提供されていないなど技術的な制約が大きくなっていました。こうした背景からZOZOTOWN全体でリプレイスプロジェクトが進められており、お気に入り機能もその一環としてマイクロサービスへの刷新に取り組んでいます。
オンプレミスSQL Serverの運用限界
ZOZOTOWNは運営開始から10年以上にわたりオンプレミス環境でシステムを拡大してきましたが、スケーラビリティや保守コストの面で課題を抱えていました。2017年よりストラングラーフィグパターンによる段階的なマイクロサービス移行が進められています。お気に入り機能のデータベースもその一環として、オンプレミスのSQL ServerからAWS上のAurora MySQLへの移行が必要でした。しかし、以下の制約がありました。
- Read/Writeが常時発生しており、システム停止を伴う移行は不可能
- 書き込んでから読み取れるまでの許容タイムラグが短く、レプリケーション方式では要件を満たせない
- オンプレミスDBへの設定変更が必要なマネージドサービス(AWS DMS等)は、他機能への影響を考慮し使用を見送り
- お気に入りデータが膨大なため、インデックス設定などのチューニングにも数時間を要する状態
これらの課題を踏まえ、移行方式を設計し技術検証しました。移行戦略の全体像は以下の3フェーズで構成されています。
- フェーズ1: SQL Server単体での運用(移行前)
- フェーズ2: SQL ServerとAurora MySQLのデュアル運用(移行期間)
- フェーズ3: Aurora MySQL単体での運用(移行完了)
フェーズ2におけるダブルライトの仕組みやフェーズ切り替えの実装については前回の記事で紹介しています。本記事ではこのフェーズ2にフォーカスします。
背景・課題
初期移行
初期移行は、ソースDB(オンプレミスSQL Server)からターゲットDB(Aurora MySQL)へのデータ一括移行です。全体の流れは以下の通りです。
- 抽出: SQL Serverから
bcpでCSV出力 - 転送: CSVファイルをS3へアップロード
- ロード:
LOAD DATA FROM S3でAurora MySQLへインポート - インデックス構築:
ALTER TABLEでインデックスを追加
制約と課題
今回の初期移行には、以下の制約がありました。
- ソースDB(本番稼働中): 影響を最小限に抑える必要がある
- ターゲットDB(サービスイン前): 大胆な最適化が可能
この非対称な条件から、「抽出は慎重に、インポートは大胆に」という方針を採用しました。抽出にはbcp(Bulk Copy Program)を採用しました。bcpはSQL Server標準のバルクエクスポートツールであり、SELECT文による抽出と比較して以下の利点があります。
- 高スループット: 200,000〜500,000行/秒の安定した出力性能
- シンプルな運用: 追加のミドルウェアやライセンスが不要
転送ではS3を中継することで、ロード失敗時に再抽出せず再実行できる設計としています。
一方、事前試算では最大規模テーブルのインポートに数日〜1週間を要することが判明しました。ロード時間が長期化すると、以下のリスクが高まります。
- 接続切断・タイムアウト: 数日に及ぶ処理は中断リスクが高い
- 障害時の復旧困難: 失敗時のデバッグと再実行に多大な時間を要する
- 移行スケジュールへの影響: ダブルライト期間が長期化し、運用負荷が増大する
- ロールバック困難: 問題発覚時に手戻りできる時間的余裕がなくなる
これらのリスクを軽減するため、インポート処理の最適化が必須でした。
検証と最適化
本番移行に先立ち、約6,000万レコードを持つテーブルを用いて3つの観点で検証しました。
1. 並列化の効果
LOAD DATA FROM S3 MANIFESTでマニフェスト分割による並列実行を検証しました。CSVファイルを4分割・8分割・16分割と変化させましたが、スループットは約51,000〜53,000行/秒で横ばいでした。
今回のAurora構成はProvisioned(単一ライターノード)であり、並列ロードを実行してもCPUおよびストレージI/O帯域がボトルネックとなります。Aurora Serverless v2のような動的スケーリング構成であれば結果が異なる可能性もありますが、今回の構成では並列化による改善は限定的でした。
2. インデックス戦略
| 方式 | 内容 | 処理効率 |
|---|---|---|
| パターンA | インデックスなしでLOAD → 後からALTERで追加 | 約61,000〜68,000行/秒 |
| パターンB | インデックスありでLOAD | 約39,000〜42,000行/秒 |
パターンAが最大59%高速でした。行挿入ごとのインデックス更新はランダムI/Oを発生させますが、一括構築ならソート後、シーケンシャルに処理できます。ターゲットDBは未稼働のため、この最適化を採用しました。
3. インスタンスサイズ
インスタンスタイプ別のスループットを比較しました。料金はAmazon Aurora の料金を参照しています。
| インスタンス | インポート効率 | ALTER効率 | オンデマンド時間単価 |
|---|---|---|---|
| r6i.2xlarge | 約125,500行/秒 | 約120,300行/秒 | 約$0.63/時 |
| r6i.16xlarge | 約162,200行/秒 | 約162,800行/秒 | 約$5.00/時 |
r6i.16xlargeはr6i.2xlargeと比較して約30%のスループット向上が見られた一方、コストは約8倍です。このスループット差がテーブル規模によって処理時間に与える影響は以下の通りです。
- 大規模テーブル(数十億レコード): 2〜3時間の短縮 → リスク低減に寄与
- 小規模テーブル(数千万レコード): 数分の短縮 → コスト対効果が低い
この結果から、大規模テーブルはr6i.16xlargeで時間短縮とリスク低減を図り、中小規模テーブルはr6i.2xlargeでコスト効率を最大化するハイブリッド戦略を採用しました。
本番移行の結果
検証結果をもとに本番移行を実施しました。最終的な移行実績は以下の通りです。
| テーブル規模 | テーブル数 | LOAD DATA | ALTER TABLE | 総所要時間 |
|---|---|---|---|---|
| 最大規模(数十億レコード) | 2 | 約4日 | 約7時間 | 約4日半 |
| 中規模(数億レコード) | 1 | 約3時間 | 約20分 | 約3時間 |
| 小規模(数千万レコード) | 5 | 約1時間 | 約10分 | 約1時間 |
| 合計 | 8 | - | - | 約5日 |
数十時間に及ぶロードでは、以下のクエリで進捗を監視しました。
SET @target_rows = ?; -- 目標件数(テーブルの総行数) SET @thread_id = ?; -- 監視対象のスレッドID SELECT CONCAT('Thread ', trx.trx_mysql_thread_id) AS target_name, CONVERT_TZ(trx.trx_started, 'UTC', 'Asia/Tokyo') AS 開始時刻_JST, ROUND(TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP()) / 3600, 2) AS 経過時間_時間, trx.trx_rows_modified AS 挿入済み行数, @target_rows AS 目標件数, ROUND(trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP()), 1) AS スループット_行毎秒, ROUND(trx.trx_rows_modified / @target_rows * 100, 2) AS 進捗率_パーセント, ROUND( (@target_rows - trx.trx_rows_modified) / (trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP())) / 3600, 2 ) AS 残り時間_時間, DATE_ADD( CONVERT_TZ(NOW(), 'UTC', 'Asia/Tokyo'), INTERVAL ROUND( (@target_rows - trx.trx_rows_modified) / (trx.trx_rows_modified / TIMESTAMPDIFF(SECOND, trx.trx_started, UTC_TIMESTAMP())) ) SECOND ) AS 完了見込み時刻_JST FROM information_schema.innodb_trx trx WHERE trx.trx_mysql_thread_id = @thread_id;
information_schema.innodb_trxのtrx_rows_modifiedから処理済み件数を取得し、経過時間で割ってスループットを算出します。目標件数との差分から残り時間と完了見込み時刻を推定し、数日に及ぶ処理においても見通しを立てられるようにしました。
得られた学び
| 学び | 根拠 |
|---|---|
| 並列化は万能ではない | マニフェスト分割を試みたが、単一ノードのI/O帯域がボトルネックとなり効果は限定的でした。闇雲に並列化するのではなく、律速段階を特定することが重要です |
| インデックスは後付けが基本 | ロード後に一括構築することで最大59%高速化。行挿入ごとのインデックス更新はランダムI/Oを発生させるが、一括構築ならソート後シーケンシャルに処理できる |
| インスタンスサイズはテーブル規模で使い分ける | 大規模テーブルはr6i.16xlargeで時間短縮とリスク低減、小規模テーブルはr6i.2xlargeでコスト効率を最大化。スループット向上率とコスト増加率のバランスを見極める |
| 必ず本番同等データでリハーサルする | 6,000万レコードでの検証結果を数十億レコードに線形外挿すると誤差が生じる。I/Oやメモリの振る舞いはデータ規模で変化するため、全量リハーサルが不可欠 |
| やり直せる設計が安心を生む | S3を中継することでロード失敗時も再抽出不要で再実行できる。数日かかる処理では「失敗しても復旧できる」という安心感が運用の質を高める |
この工程が安定したことで、後続の増分同期フェーズへ安全に進められました。
Embulkによる差分同期
初期移行が完了した後も、オンプレミスのSQL Serverには新規データが書き込まれ続けます。この増加分をAurora MySQLへ反映するため、Embulkを用いた差分同期の仕組みを構築しました。

図中の「マスタ」はマイクロサービスがSQL Serverをマスタ(書き込みの主系)として参照・更新することを示しています。「非同期」はマイクロサービスがSQL Serverと同じ結果をAurora MySQLへ非同期に反映されることを示しています。「保存」はEmbulkジョブ完了後に差分の起点となる状態(config-diff)をS3へアップロードすることを指しています。「復元」は次回ジョブ起動時にS3からその状態をダウンロードすることを指しています。これにより前回の続きから差分取得を再開できます。
ジョブ設計
Embulkのインクリメンタル同期では、updated_atのような更新日時カラムを差分キーとして利用するのがベストプラクティスです。しかし、今回の移行元テーブルはInsert/Deleteのみの操作で設計されており、レコードの更新(Update)が発生しないためupdated_atに相当するカラムが存在しません。このテーブルの特性を踏まえ、操作種別ごとに差分キーを使い分ける設計を採用しました。
1つのテーブルに対して役割の異なる最大3つのジョブを用意しています。
| ジョブ種別 | インクリメンタル列 | 対象レコード |
|---|---|---|
| 通常ジョブ | 登録日(registered_at) |
新規追加されたレコード |
| 削除ジョブ | 削除日(deleted_at) |
論理削除されたレコード |
| アーカイブジョブ | 連番ID | 削除テーブルへ移動済みのレコード |
通常ジョブは登録日、削除ジョブは削除日をそれぞれ基準にレコードを取得します。
-- 通常ジョブ WHERE registered_at >= :registered_at -- 削除ジョブ WHERE deleted_at IS NOT NULL AND deleted_at >= :deleted_at
アーカイブジョブでは、Embulkのbefore_loadとafter_loadフックを活用し、以下の3ステップを1つのジョブ内で完結させています。
out: mode: merge_direct before_load: > UPDATE watermark SET id = (SELECT COALESCE(MAX(id), 0) FROM archived_favorites) after_load: > DELETE FROM favorites WHERE EXISTS ( SELECT 1 FROM archived_favorites WHERE archived_favorites.favorite_id = favorites.id AND archived_favorites.id >= (SELECT id FROM watermark) )
before_loadでロード前のアーカイブテーブルの最大IDをウォーターマークとして記録し、after_loadでウォーターマーク以降の新規アーカイブ分に対応するお気に入りレコードを物理削除します。ウォーターマークがなければアーカイブテーブル全レコードが削除対象となり、毎回全件スキャンが発生します。ウォーターマークにより、今回のジョブで追加された差分だけに処理を限定しています。この設計により、お気に入り商品・ブランド・ショップの各テーブルに対してゲスト・会員の2種類を掛け合わせた複数パターンの差分同期を体系的に管理しています。
ソースDBへの負荷制御
差分同期では稼働中のオンプレミスSQL Serverからデータを読み取ります。本番サービスへの影響を抑えるため、複数のパラメータで負荷を制御しました。
# 共通入力設定(抜粋) in: type: sqlserver transaction_isolation_level: NOLOCK # ロック競合を回避 fetch_rows: 1000 # メモリ消費を抑制
SELECT TOP 10000 -- 1回あたりの取得行数を制限 registered_at, id, member_id, ... FROM favorites WITH (NOLOCK) WHERE registered_at >= :registered_at ORDER BY registered_at OPTION (MAX_GRANT_PERCENT = 25) -- クエリのメモリグラント上限を設定
NOLOCKヒントでロック競合を回避し、TOP N句で1回あたりの取得行数を制限しています。fetch_rowsでJDBCのフェッチサイズを制御し、MAX_GRANT_PERCENTでSQL Serverのクエリメモリグラント上限を設定しました。
また、embulk-input-sqlserverのインクリメンタルロードでは、対応する列型が整数型・文字列型・datetime2型に限定されています。しかし、移行元テーブルの日時カラムはsmalldatetime型であり、そのままではインクリメンタル列として使用できません。この制約の回避策として、クエリ内でCAST(削除日カラム AS DATETIME)と明示的に型変換しています。
データ整合性の担保
差分取得では > ではなく >= を使用しています。> の場合、同一タイムスタンプに複数レコードが存在すると一部を取りこぼすリスクがあります。>= では前回の最終レコードを重複取得する可能性があります。しかし、Embulkの出力モードをmerge_directに設定すれば、重複分はUPSERTとして吸収されます。
out: mode: merge_direct
「取りこぼし」と「重複」のトレードオフにおいて、重複を許容しつつ冪等性で吸収する方針を採用しました。
差分の起点となる状態管理にも工夫が必要でした。Embulkは--config-diffオプションにより、前回処理の最終レコード(last_record)をYAMLファイルに記録します。
in: last_record: ['2023-12-23T09:00:30.000000'] out: {}
しかし、Kubernetes Jobとして実行する場合、Podはジョブ完了後に破棄されます。ローカルファイルシステム上の差分状態は失われるため、S3に永続化する仕組みを構築しました。
- ジョブ開始時にS3から前回の差分状態をダウンロード
- Embulkによる差分同期の実行と差分状態の更新
- ジョブ完了時に更新された差分状態をS3にアップロード
ここで、ダウンロードとアップロードの失敗は致命的エラーとしてジョブを失敗させます。
設定管理とチューニング
複数パターンの設定ファイルは、対象テーブルやカラム名が異なるものの接続情報やパラメータは共通しています。EmbulkのLiquidテンプレート機能を活用し、共通部分を3つのテンプレートに集約しました。
| 共通テンプレート | 役割 |
|---|---|
| 入力設定 | SQL Server接続情報、トランザクション分離レベル、フェッチサイズ |
| 出力設定 | MySQL接続情報、出力モード |
| SELECT句生成 | 環境変数に基づくTOP N句の条件付き生成 |
個別の設定ファイルでは共通テンプレートをインクルードし、テーブル名・カラム名・WHERE句のみを定義します。SELECT句の共通テンプレートでは、環境変数が未設定の場合はTOP句自体を生成せず、設定されている場合のみ行数制限を付与する条件分岐を実現しています。これにより、本番環境では制限なし、検証環境では制限ありという切り替えが可能です。
負荷制御パラメータ(TOP N、fetch_rows、MAX_GRANT_PERCENT等)もすべて環境変数に切り出しており、コンテナイメージの再ビルドなしに変更を反映できます。テーブル単位で処理時間を計測してボトルネックを特定し、検証環境での調整結果を本番環境へ反映するサイクルを効率的に回せる設計です。
まとめ
本記事では、ZOZOTOWNのお気に入りデータベースにおける数十億レコード規模のデータ移行について、初期移行の最適化とEmbulkを用いた差分同期の取り組みを紹介しました。
初期移行では、インデックスの後付けやテーブル規模に応じたインスタンスサイズの使い分けにより、約5日間で全テーブルの移行を完了しました。差分同期では、updated_atカラムが存在しない制約に対し、役割の異なる複数ジョブを設計することで、サービス無停止のまま増分データの反映を実現しました。
大規模データ移行やEmbulkによる異種DB間の差分同期を検討されている方にとって、本記事が参考になれば幸いです。今後はAurora MySQL単体運用への切り替えを進め、お気に入り機能のマイクロサービス化を完遂していきます。
ZOZOでは、一緒にサービスを作り上げてくれる方を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください。