以下の内容はhttps://dk521123.hatenablog.com/entry/2025/10/01/173127より取得しました。


【dbt】dbt ~ 更新 / 独自ストラテジ ~

◾️はじめに

dbt の更新において、
incremental_strategyをカスタムすることを知ったので、
メモしておく

目次

【1】独自のincremental_strategyの作成手順
 Step1:マクロ「get_incremental_【STRATEGY】_sql」を作成
 Step2:incremental_strategy: 【STRATEGY】で呼び出す
【2】arg_dict
 1)target_relation
 2)temp_relation
 3)unique_key
 4)dest_columns
 5)incremental_predicates
【3】その他Tips
 1)get_quoted_csv
【4】サンプル

【1】独自のincremental_strategyの作成手順

https://docs.getdbt.com/docs/build/incremental-strategy#custom-strategies

に記載されている。

以下のStep1〜Step2のみで可能。

Step1:マクロ「get_incremental【STRATEGY】sql」を作成

* 「get_incremental_【STRATEGY】_sql」って名前のマクロを作成する
 => 【STRATEGY】部分は、独自のStrategy名をつける

macros/my_custom_strategies.sql

-- 今回は「insert_only」って名前のStrategy名で。
{% macro get_incremental_insert_only_sql(arg_dict) %}
  {% do return(some_custom_macro_with_sql(
     arg_dict["target_relation"],
     arg_dict["temp_relation"], 
     arg_dict["unique_key"],
     arg_dict["dest_columns"],
     arg_dict["incremental_predicates"]))
    %}
{% endmacro %}

{% macro some_custom_macro_with_sql(
    target_relation,
    temp_relation,
    unique_key,
    dest_columns,
    incremental_predicates)
%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

    insert into {{ target_relation }} ({{ dest_cols_csv }})
    (
        select {{ dest_cols_csv }}
        from {{ temp_relation }}
    )

{% endmacro %}

Step2:incremental_strategy: 【STRATEGY】で呼び出す

models/my_model.sql

{{ config(
    materialized="incremental",
    incremental_strategy="insert_only",
    ...
) }}

...

【2】arg_dict

* マクロ内で使われている引数 arg_dict["target_relation"] に関するメモ

1)target_relation

* モデルの出力先テーブル情報(e.g. demo_schema.demo_table)

2)temp_relation

* 新規データや差分データを格納された一時的に作成されるテーブル情報

3)unique_key

* Configに指定されているカラム名ORカラム名リスト

4)dest_columns

* 出力先テーブル(ターゲットテーブル)のカラム情報リスト

5)incremental_predicates

* WHERE句で使う追加条件式(e.g. ["update_at >= '2025-10-01'"])
 => config の incremental_predicates の値

https://docs.getdbt.com/docs/build/incremental-strategy#about-incremental_predicates

{{
  config(
    materialized = 'incremental',
    ...
    incremental_predicates = [
      "DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"
    ]
  )
}}

【3】その他Tips

1)get_quoted_csv

* カラム名のリストをカンマ区切りのクオート付きの文字列に変換
 => ソースコードは、以下にある
 (他にも「diff_columns」「diff_column_data_types」があった)

https://github.com/dbt-labs/dbt-adapters/blob/e964302121f70bacc8f827683c329e06696216cf/dbt/include/global_project/macros/materializations/models/incremental/column_helpers.sql
呼び出し例

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

【4】サンプル

macros/get_incremental_delete_insert_ex.sql

-- 今回は「delete_insert_ex」って名前のStrategy名で。
{% macro get_incremental_delete_insert_ex_sql(arg_dict) %}
  {% do return(default__get_incremental_delete_insert_ex_sql(
     arg_dict["target_relation"],
     arg_dict["temp_relation"], 
     arg_dict["unique_key"],
     arg_dict["dest_columns"],
     arg_dict["incremental_predicates"]))
    %}
{% endmacro %}

{% macro default__get_incremental_delete_insert_ex_sql(
    target_relation,
    temp_relation,
    unique_key,
    dest_columns,
    incremental_predicates)
%}
    {{ log("target_relation: " ~ target_relation, True) }}
    {{ log("temp_relation: " ~ temp_relation, True) }}
    {{ log("unique_key: " ~ unique_key, True) }}
    {{ log("dest_columns: " ~ dest_columns, True) }}
    {{ log("incremental_predicates: " ~ incremental_predicates, True) }}

    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {{ log("dest_cols_csv: " ~ dest_cols_csv, True) }}

    {% if unique_key %}
        {%- set delete_map = 'TMP___' ~ modules.datetime.date.today().strftime('%Y%m%d%H%M%S') -%}

        create or replace table {{ delete_map }} as
        (
            select distinct
            {% if unique_key is sequence and unique_key is not string %}
                {% for key in unique_key %}
                    {{ key }},
                {% endfor %}
            {% else %}
                {{ unique_key }}
            {% endif %}
            from {{ temp_relation }}
        );

        {% if unique_key is sequence and unique_key is not string %}
            delete from {{ target_relation }}
            using {{ delete_map }}
            where (
                {% for key in unique_key %}
                    {{ delete_map }}.{{ key }} = {{ target_relation }}.{{ key }}
                    {{ "and " if not loop.last}}
                {% endfor %}
                {% if incremental_predicates %}
                    {% for predicate in incremental_predicates %}
                        and {{ predicate }}
                    {% endfor %}
                {% endif %}
            );
        {% else %}
            delete from {{ target_relation }}
            where (
                {{ unique_key }}) in (
                select ({{ unique_key }})
                from {{ delete_map }}
            )
            {%- if incremental_predicates %}
                {% for predicate in incremental_predicates %}
                    and {{ predicate }}
                {% endfor %}
            {%- endif -%};
        {% endif %}

        drop table if exists {{ delete_map }};
    {% endif %}

    insert into {{ target_relation }} ({{ dest_cols_csv }})
    (
        select {{ dest_cols_csv }}
        from {{ temp_relation }}
    )

{% endmacro %}

models/user_full.sql

{{
    config(
        materialized='incremental',
        unique_key='user_id',
        incremental_strategy='delete_insert_ex'
    )
}}

WITH
FULL_TABLE AS (
    {% if is_incremental() %}
        SELECT * FROM {{ this }}
    {% else %}
        SELECT
            NULL AS user_id,
            NULL AS name,
            NULL AS gender,
            NULL AS email,
            NULL AS remarks
    {% endif %}
),
USER_TABLE AS (
    SELECT * FROM {{ ref('user') }}
),
USER_PII AS (
    SELECT * FROM {{ ref('user_pii') }}
),
UNION_TABLE AS (
    SELECT
    COALESCE(u.user_id, pi.user_id) AS user_id,
    COALESCE(pi.name, f.name) AS name,
    COALESCE(u.gender, f.gender) AS gender,
    COALESCE(pi.email, f.email) AS email,
    COALESCE(u.remarks, f.remarks) AS remarks
    FROM
    USER_TABLE AS u
    FULL OUTER JOIN
    USER_PII AS pi
    ON
    u.user_id = pi.user_id
    LEFT OUTER JOIN
    FULL_TABLE AS f
    ON
    u.user_id = f.user_id
    OR pi.user_id = f.user_id
)

SELECT * FROM UNION_TABLE

ログ出力

[0m13:23:38  target_relation: "dbt_db"."public"."user_full"
[0m13:23:38  temp_relation: "user_full__dbt_tmp132338083823"
[0m13:23:38  unique_key: user_id
[0m13:23:38  dest_columns: [PostgresColumn(column='user_id', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='name', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='gender', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='email', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None), PostgresColumn(column='remarks', dtype='text', char_size=None, numeric_precision=None, numeric_scale=None)]
[0m13:23:38  incremental_predicates: None
[0m13:23:38  dest_cols_csv: "user_id", "name", "gender", "email", "remarks"

参考文献

https://techblog.finatext.com/dbt-snowflake-incremental-exploding-joins-7ca8a6b484ca

関連記事

dbt ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/06/30/000000
dbt ~ 環境設定 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2024/10/11/230419
dbt ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/30/151003
dbt ~ 更新 / 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/12/07/060129
dbt ~ 更新 / Update or Insert / Insert ~
https://dk521123.hatenablog.com/entry/2023/12/19/224453
dbt ~ 更新 / Delete and Insert ~
https://dk521123.hatenablog.com/entry/2023/12/20/000104
dbt ~ 更新 / DROP + CTAS ~
https://dk521123.hatenablog.com/entry/2023/12/04/000000
dbt ~ modules変数 ~
https://dk521123.hatenablog.com/entry/2025/10/02/132545
dbt 〜 生成されるSQL文を確認する 〜
https://dk521123.hatenablog.com/entry/2024/08/25/002741




以上の内容はhttps://dk521123.hatenablog.com/entry/2025/10/01/173127より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14