◾️はじめに
dbt の更新において、 incremental_strategyをカスタムすることを知ったので、 メモしておく
目次
【1】独自のincremental_strategyの作成手順 Step1:マクロ「get_incremental_【STRATEGY】_sql」を作成 Step2:incremental_strategy: 【STRATEGY】で呼び出す 【2】arg_dict 1)target_relation 2)temp_relation 3)unique_key 4)dest_columns 5)incremental_predicates 【3】その他Tips 1)get_quoted_csv 【4】サンプル
【1】独自のincremental_strategyの作成手順
https://docs.getdbt.com/docs/build/incremental-strategy#custom-strategies
に記載されている。 以下のStep1〜Step2のみで可能。
Step1:マクロ「get_incremental【STRATEGY】sql」を作成
* 「get_incremental_【STRATEGY】_sql」って名前のマクロを作成する => 【STRATEGY】部分は、独自のStrategy名をつける
macros/my_custom_strategies.sql
-- 今回は「insert_only」って名前のStrategy名で。 {% macro get_incremental_insert_only_sql(arg_dict) %} {% do return(some_custom_macro_with_sql( arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} {% endmacro %} {% macro some_custom_macro_with_sql( target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} insert into {{ target_relation }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ temp_relation }} ) {% endmacro %}
Step2:incremental_strategy: 【STRATEGY】で呼び出す
models/my_model.sql
{{ config(
materialized="incremental",
incremental_strategy="insert_only",
...
) }}
...
【2】arg_dict
* マクロ内で使われている引数 arg_dict["target_relation"] に関するメモ
1)target_relation
* モデルの出力先テーブル情報(e.g. demo_schema.demo_table)
2)temp_relation
* 新規データや差分データを格納された一時的に作成されるテーブル情報
3)unique_key
* Configに指定されているカラム名ORカラム名リスト
4)dest_columns
* 出力先テーブル(ターゲットテーブル)のカラム情報リスト
5)incremental_predicates
* WHERE句で使う追加条件式(e.g. ["update_at >= '2025-10-01'"]) => config の incremental_predicates の値
https://docs.getdbt.com/docs/build/incremental-strategy#about-incremental_predicates
例
{{
config(
materialized = 'incremental',
...
incremental_predicates = [
"DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"
]
)
}}
【3】その他Tips
1)get_quoted_csv
* カラム名のリストをカンマ区切りのクオート付きの文字列に変換 => ソースコードは、以下にある (他にも「diff_columns」「diff_column_data_types」があった)
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
【4】サンプル
macros/get_incremental_delete_insert_ex.sql
-- 今回は「delete_insert_ex」って名前のStrategy名で。 {% macro get_incremental_delete_insert_ex_sql(arg_dict) %} {% do return(default__get_incremental_delete_insert_ex_sql( arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %} {% endmacro %} {% macro default__get_incremental_delete_insert_ex_sql( target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %} {{ log("target_relation: " ~ target_relation, True) }} {{ log("temp_relation: " ~ temp_relation, True) }} {{ log("unique_key: " ~ unique_key, True) }} {{ log("dest_columns: " ~ dest_columns, True) }} {{ log("incremental_predicates: " ~ incremental_predicates, True) }} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {{ log("dest_cols_csv: " ~ dest_cols_csv, True) }} {% if unique_key %} {%- set delete_map = 'TMP___' ~ modules.datetime.date.today().strftime('%Y%m%d%H%M%S') -%} create or replace table {{ delete_map }} as ( select distinct {% if unique_key is sequence and unique_key is not string %} {% for key in unique_key %} {{ key }}, {% endfor %} {% else %} {{ unique_key }} {% endif %} from {{ temp_relation }} ); {% if unique_key is sequence and unique_key is not string %} delete from {{ target_relation }} using {{ delete_map }} where ( {% for key in unique_key %} {{ delete_map }}.{{ key }} = {{ target_relation }}.{{ key }} {{ "and " if not loop.last}} {% endfor %} {% if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {% endif %} ); {% else %} delete from {{ target_relation }} where ( {{ unique_key }}) in ( select ({{ unique_key }}) from {{ delete_map }} ) {%- if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {%- endif -%}; {% endif %} drop table if exists {{ delete_map }}; {% endif %} insert into {{ target_relation }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ temp_relation }} ) {% endmacro %}
models/user_full.sql
{{
config(
materialized='incremental',
unique_key='user_id',
incremental_strategy='delete_insert_ex'
)
}}
WITH
FULL_TABLE AS (
{% if is_incremental() %}
SELECT * FROM {{ this }}
{% else %}
SELECT
NULL AS user_id,
NULL AS name,
NULL AS gender,
NULL AS email,
NULL AS remarks
{% endif %}
),
USER_TABLE AS (
SELECT * FROM {{ ref('user') }}
),
USER_PII AS (
SELECT * FROM {{ ref('user_pii') }}
),
UNION_TABLE AS (
SELECT
COALESCE(u.user_id, pi.user_id) AS user_id,
COALESCE(pi.name, f.name) AS name,
COALESCE(u.gender, f.gender) AS gender,
COALESCE(pi.email, f.email) AS email,
COALESCE(u.remarks, f.remarks) AS remarks
FROM
USER_TABLE AS u
FULL OUTER JOIN
USER_PII AS pi
ON
u.user_id = pi.user_id
LEFT OUTER JOIN
FULL_TABLE AS f
ON
u.user_id = f.user_id
OR pi.user_id = f.user_id
)
SELECT * FROM UNION_TABLE
ログ出力
[0m13:23:38 target_relation: "dbt_db"."public"."user_full" [0m13:23:38 temp_relation: "user_full__dbt_tmp132338083823" [0m13:23:38 unique_key: user_id [0m13:23:38 dest_columns: [PostgresColumn(column='user_id', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='name', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='gender', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='email', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='remarks', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None)] [0m13:23:38 incremental_predicates: None [0m13:23:38 dest_cols_csv: "user_id", "name", "gender", "email", "remarks"
参考文献
https://techblog.finatext.com/dbt-snowflake-incremental-exploding-joins-7ca8a6b484ca
関連記事
dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 環境設定 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2024/10/11/230419
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt ~ 更新 / 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/12/07/060129
dbt ~ 更新 / Update or Insert / Insert ~
https://dk521123.hatenablog.com/entry/2023/12/19/224453
dbt ~ 更新 / Delete and Insert ~
https://dk521123.hatenablog.com/entry/2023/12/20/000104
dbt ~ 更新 / DROP + CTAS ~
https://dk521123.hatenablog.com/entry/2023/12/04/000000
dbt ~ modules変数 ~
https://dk521123.hatenablog.com/entry/2025/10/02/132545
dbt 〜 生成されるSQL文を確認する 〜
https://dk521123.hatenablog.com/entry/2024/08/25/002741