■ はじめに
https://dk521123.hatenablog.com/entry/2023/12/07/060129
の続き。 今回は、Delete and Insert について、深堀っていく
目次
【1】構文 【2】サンプル 【3】補足:処理フロー 1)実際のログ
【1】構文
* delete and Insert に関しては、以下を組み合わせる + materialized='incremental' + incremental_strategy='delete+insert'
イメージ
{{
config(
materialized='incremental',
unique_key=['user_id' , ...],
incremental_strategy='delete+insert'
)
}}
【2】サンプル
[0] サンプルテーブル定義
DROP TABLE IF EXISTS daily_sample_purchase_history; CREATE TABLE daily_sample_purchase_history ( purchase_id VARCHAR NOT NULL, item_name VARCHAR NOT NULL, number INT NOT NULL, created_at DATE NOT NULL );
[1] 初期データ (1回目)
INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Apple', 2, '2023-12-01'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y002', 'Melon', 5, '2023-12-01'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Apple","2","2023-12-01" "Y002","Melon","5","2023-12-01" ~~~~~~
[3] モデル「models/full_sample_purchase_history.sql」定義 (1回目)
{{
config(
materialized='incremental',
unique_key=['created_at'],
incremental_strategy='delete+insert'
)
}}
WITH
updated_purchase_history AS (
SELECT
purchase_id AS purchase_id,
item_name AS item_name,
number AS number,
created_at::DATE AS created_at
FROM
{{ source('demo_source', 'daily_sample_purchase_history') }}
)
SELECT * FROM updated_purchase_history
[4] dbt run (1回目)
dbt run
[5] 確認
SELECT * FROM full_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "X001","Apple","2","2023-12-01" "X002","Melon","5","2023-12-01" ~~~~~~
[6] 追加データ投入 (2回目)
-- 1回目のデータをクリア Truncate table daily_sample_purchase_history; -- 更新データ INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Orange', 12, '2023-12-01'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y003', 'Pineapple', 21, '2023-12-01'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Orange","12","2023-12-01" "Y003","Pineapple","21","2023-12-01" ~~~~~~
[7] dbt run (2回目)
dbt run
[8] 確認
SELECT * FROM full_sample_purchase_history ORDER BY purchase_id; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Orange","12","2023-12-01" "Y003","Pineapple","21","2023-12-01" ~~~~~~
[9] 追加データ投入 (3回目)
-- 2回目のデータをクリア Truncate table daily_sample_purchase_history; -- 更新データ INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y001', 'Mango', 23, '2023-12-02'); INSERT INTO daily_sample_purchase_history (purchase_id, item_name, number, created_at) VALUES ('Y005', 'Banana', 34, '2023-12-02'); SELECT * FROM daily_sample_purchase_history; ~~~~~~ "purchase_id","item_name","number","created_at" "Y001","Mango","23","2023-12-02" "Y005","Banana","34","2023-12-02" ~~~~~~
[10] dbt run (3回目)
dbt run
[11] 確認
SELECT * FROM full_sample_purchase_history ORDER BY purchase_id;
【3】補足:処理フロー
dbt が、裏でどういった処理を行なっているのか確認するために、
https://dk521123.hatenablog.com/entry/2025/09/04/000906
のコードをDelete and Insertに書き換えて、 dbt run --log-level DEBUGで、ログだししたところ、 以下のような処理をしていることがわかった。
処理フロー・概要
Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成 Step2:オリジナルテーブル(user_full)と TMPテーブル(user_full__dbt_tmp042824425257)の カラム情報を取得(一致しているか確認してる?) Step3:オリジナルテーブル(user_full)に対して、 TMPテーブル(user_full__dbt_tmp042824425257)を使って、 DELETE+INSERTをかける
1)実際のログ
[0m04:28:24 Began running node model.my_project.user_full
[0m04:28:24 2 of 3 START sql incremental model public.user_full ............................ [RUN]
[0m04:28:24 Re-using an available connection from the pool (formerly model.my_project.my_first_dbt_model, now model.my_project.user_full)
[0m04:28:24 Began compiling node model.my_project.user_full
[0m04:28:24 Writing injected SQL for node "model.my_project.user_full"
[0m04:28:24 Began executing node model.my_project.user_full
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */
★ Step1:TMPテーブル(user_full__dbt_tmp042824425257)で新規作成
create temporary table "user_full__dbt_tmp042824425257"
as
(
WITH
FULL_TABLE AS (
SELECT * FROM "dbt_db"."public"."user_full"
),
USER_TABLE AS (
SELECT * FROM "dbt_db"."public"."user"
),
USER_PII AS (
SELECT * FROM "dbt_db"."public"."user_pii"
),
FINAL_TABLE AS (
SELECT
COALESCE(u.user_id, pi.user_id) AS user_id,
COALESCE(pi.name, f.name) AS name,
COALESCE(u.gender, f.gender) AS gender,
COALESCE(pi.email, f.email) AS email,
COALESCE(u.remarks, f.remarks) AS remarks
FROM
USER_TABLE AS u
FULL OUTER JOIN
USER_PII AS pi
ON
u.user_id = pi.user_id
LEFT OUTER JOIN
FULL_TABLE AS f
ON
u.user_id = f.user_id
OR pi.user_id = f.user_id
)
SELECT * FROM FINAL_TABLE
);
[0m04:28:24 Opening a new connection, currently in state closed
[0m04:28:24 SQL status: SELECT 7 in 0.055 seconds
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: BEGIN
[0m04:28:24 SQL status: BEGIN in 0.000 seconds
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */
★Step2:オリジナルテーブル(user_full)と
TMPテーブル(user_full__dbt_tmp042824425257)の
カラム情報を取得(一致しているか確認してる?)
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from INFORMATION_SCHEMA.columns
where table_name = 'user_full__dbt_tmp042824425257'
order by ordinal_position
[0m04:28:24 SQL status: SELECT 5 in 0.020 seconds
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from "dbt_db".INFORMATION_SCHEMA.columns
where table_name = 'user_full'
and table_schema = 'public'
order by ordinal_position
[0m04:28:24 SQL status: SELECT 5 in 0.001 seconds
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from "dbt_db".INFORMATION_SCHEMA.columns
where table_name = 'user_full'
and table_schema = 'public'
order by ordinal_position
[0m04:28:24 SQL status: SELECT 5 in 0.001 seconds
[0m04:28:24 Writing runtime sql for node "model.my_project.user_full"
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: /* {"app": "dbt", "dbt_version": "1.9.0", "profile_name": "my_project", "target_name": "dev", "node_id": "model.my_project.user_full"} */
★Step3:オリジナルテーブル(user_full)に対して、
TMPテーブル(user_full__dbt_tmp042824425257)を使って、
DELETE+INSERTをかける
delete from "dbt_db"."public"."user_full"
where (
user_id) in (
select (user_id)
from "user_full__dbt_tmp042824425257"
);
insert into "dbt_db"."public"."user_full" ("user_id", "name", "gender", "email", "remarks")
(
select "user_id", "name", "gender", "email", "remarks"
from "user_full__dbt_tmp042824425257"
)
[0m04:28:24 SQL status: INSERT 0 7 in 0.005 seconds
[0m04:28:24 On model.my_project.user_full: COMMIT
[0m04:28:24 Using postgres connection "model.my_project.user_full"
[0m04:28:24 On model.my_project.user_full: COMMIT
[0m04:28:24 SQL status: COMMIT in 0.001 seconds
[0m04:28:24 On model.my_project.user_full: Close
[0m04:28:24 Sending event: {'category': 'dbt', 'action': 'run_model', 'label': 'c3d43d45-e240-42e7-bcec-a93436871f57', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7ffff8b74860>]}
[0m04:28:24 2 of 3 OK created sql incremental model public.user_full ....................... [[32mINSERT 0 7[0m in 0.15s]
[0m04:28:24 Finished running node model.my_project.user_full
関連記事
dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt ~ 基本編 / Source ~
https://dk521123.hatenablog.com/entry/2023/12/08/111012
dbt ~ 更新 / 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/12/07/060129
dbt ~ 更新 / Update or Insert / Insert ~
https://dk521123.hatenablog.com/entry/2023/12/19/224453
dbt ~ 更新 / DROP + CTAS ~
https://dk521123.hatenablog.com/entry/2023/12/04/000000