О стратегии инкрементального обновления
Существует несколько стратегий для реализации концепции инкрементальных материализаций. Ценность каждой стратегии зависит от:
- Объема данных.
- Надежности вашего
unique_key
. - Поддержки определенных функций в вашей платфо рме данных.
Некоторые адаптеры предоставляют необязательную конфигурацию incremental_strategy
, которая управляет кодом, используемым dbt для построения инкрементальных моделей.
microbatch
стратегия инкрементального обновления предназначена для больших наборов данных временных рядов. dbt будет обрабатывать инкрементальную модель в нескольких запросах (или "пакетах") на основе настроенного столбца event_time
. В зависимости от объема и характера ваших данных, это может быть более эффективным и устойчивым, чем использование одного запроса для добавления новых данных.
Поддерживаемые стратегии инкрементального обновления по адаптерам
Эта таблица представляет доступность каждой стратегии инкрементального обновления на основе последней версии dbt Core и каждого адаптера.
Нажмите на название адаптера в таблице ниже для получения дополнительной информации о поддерживаемых стратегиях инкрементального обновления.
Адаптер платформы данных | append | merge | delete+insert | insert_overwrite | microbatch beta |
---|---|---|---|---|---|
dbt-postgres | ✅ | ✅ | ✅ | ✅ | |
dbt-redshift | ✅ | ✅ | ✅ | ✅ | |
dbt-bigquery | ✅ | ✅ | ✅ | ||
dbt-spark | ✅ | ✅ | ✅ | ✅ | |
dbt-databricks | ✅ | ✅ | ✅ | ✅ | |
dbt-snowflake | ✅ | ✅ | ✅ | ✅ | |
dbt-trino | ✅ | ✅ | ✅ | ||
dbt-fabric | ✅ | ✅ | |||
dbt-athena | ✅ | ✅ | ✅ |
Настройка стратегии инкрементального обновления
Конфигурация incremental_strategy
может быть определена как в конкретных моделях, так и для всех моделей в вашем файле dbt_project.yml
:
models:
+incremental_strategy: "insert_overwrite"
или:
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='delete+insert',
...
)
}}
select ...
Конфигурации, специфичные для стратегии
Если вы используете стратегию merge
и указываете unique_key
, по умолчанию dbt полностью перезапишет совпадающие строки новыми значениями.
На адаптерах, которые поддерживают стратегию merge
(включая Snowflake, BigQuery, Apache Spark и Databricks), вы можете дополнительно передать список имен столбцов в конфигурацию merge_update_columns
. В этом случае dbt обновит только столбцы, указанные в конфигурации, и сохранит предыдущие значения других столбцов.
{{
config(
materialized = 'incremental',
unique_key = 'id',
merge_update_columns = ['email', 'ip_address'],
...
)
}}
select ...
В качестве альтернативы, вы можете указать список столбцов, которые следует исключить из обновления, передав список имен столбцов в конфигурацию merge_exclude_columns
.
{{
config(
materialized = 'incremental',
unique_key = 'id',
merge_exclude_columns = ['created_at'],
...
)
}}
select ...
О incremental_predicates
incremental_predicates
— это продвинутое использование инкрементальных моделей, когда объем данных достаточно велик, чтобы оправдать дополнительные инвестиции в производительность. Эта конфигурация принимает список любых допустимых SQL-выражений. dbt не проверяет синтаксис SQL-выражений.
Пример конфигурации модели в файле yml
, который вы можете ожидать увидеть в Snowflake:
models:
- name: my_incremental_model
config:
materialized: incremental
unique_key: id
# это повлияет на то, как данные хранятся на диске и индексируются для ограничения сканирования
cluster_by: ['session_start']
incremental_strategy: merge
# это ограничивает сканирование существующей таблицы последними 7 днями данных
incremental_predicates: ["DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"]
# `incremental_predicates` принимает список SQL-выражений.
# `DBT_INTERNAL_DEST` и `DBT_INTERNAL_SOURCE` — это стандартные псевдонимы для целевой таблицы и временной таблицы соответственно во время инкрементального запуска с использованием стратегии merge.
В качестве альтернативы, вот те же конфигурации, настроенные в файле модели:
-- в models/my_incremental_model.sql
{{
config(
materialized = 'incremental',
unique_key = 'id',
cluster_by = ['session_start'],
incremental_strategy = 'merge',
incremental_predicates = [
"DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"
]
)
}}
...
Это создаст (в файле dbt.log
) оператор merge
вида:
merge into <existing_table> DBT_INTERNAL_DEST
from <temp_table_with_new_records> DBT_INTERNAL_SOURCE
on
-- уникальный ключ
DBT_INTERNAL_DEST.id = DBT_INTERNAL_SOURCE.id
and
-- пользовательский предикат: ограничивает сканирование данных в "старых" данных / существующей таблице
DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)
when matched then update ...
when not matched then insert ...
Ограничьте сканирование данных входящих таблиц в теле их инкрементальной модели SQL, что ограничит количество "новых" данных, обрабатываемых/трансформируемых.
with large_source_table as (
select * from {{ ref('large_source_table') }}
{% if is_incremental() %}
where session_start >= dateadd(day, -3, current_date)
{% endif %}
),
...
Синтаксис зависит от того, как вы настраиваете свою incremental_strategy
:
- Если вы используете стратегию
merge
, вам может потребоваться явно указать псевдонимы для любых столбцов с помощьюDBT_INTERNAL_DEST
("старые" данные) илиDBT_INTERNAL_SOURCE
("новые" данные). - Существует значительное концептуальное пересечение со стратегией инкрементального обновления
insert_overwrite
.
Встроенные стратегии
Прежде чем углубляться в пользовательские стратегии, важно понять встроенные стратегии инкрементального обновления в dbt и их соответствующие макросы:
incremental_strategy | Соответствующий макрос |
---|---|
append | get_incremental_append_sql |
delete+insert | get_incremental_delete_insert_sql |
merge | get_incremental_merge_sql |
insert_overwrite | get_incremental_insert_overwrite_sql |
microbatch beta | get_incremental_microbatch_sql |
Например, встроенная стратегия для append
может быть определена и использована с помощью следующих файлов:
{% macro get_incremental_append_sql(arg_dict) %}
{% do return(some_custom_macro_with_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
{% endmacro %}
{% macro some_custom_macro_with_sql(target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
)
{% endmacro %}
Определите модель models/my_model.sql:
{{ config(
materialized="incremental",
incremental_strategy="append",
) }}
select * from {{ ref("some_model") }}
Пользовательские стратегии
Пользовательские стратегии в настоящее время не поддерживаются на адаптерах BigQuery и Spark.
Начиная с dbt v1.2 и далее, пользователи имеют более простую альтернативу созданию совершенно новой материализации. Они определяют и используют свои собственные "пользовательские" стратегии инкрементального обновления, выполняя следующие шаги:
- Определите макрос с именем
get_incremental_STRATEGY_sql
. Обратите внимание, чтоSTRATEGY
является заполнителем, и вы должны заменить его на имя вашей пользовательской стратегии инкрементального обновления. - Настройте
incremental_strategy: STRATEGY
в инкремента льной модели.
dbt не будет проверять пользовательские стратегии, он просто будет искать макрос с таким именем и выдаст ошибку, если не сможет его найти.
Например, пользовательская стратегия с именем insert_only
может быть определена и использована с помощью следующих файлов:
{% macro get_incremental_insert_only_sql(arg_dict) %}
{% do return(some_custom_macro_with_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
{% endmacro %}
{% macro some_custom_macro_with_sql(target_relation, temp_relation, unique_key, dest_columns, incremental_predicates) %}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
)
{% endmacro %}
{{ config(
materialized="incremental",
incremental_strategy="insert_only",
...
) }}
...
Если вы используете пользовательский макрос microbatch, установите флаг поведения require_batched_execution_for_custom_microbatch_strategy
в вашем dbt_project.yml
, чтобы включить пакетное выполнение вашей пользовательской стратегии.
Пользовательские стратегии из пакета
Чтобы использовать пользовательскую стратегию инкрементального обновления merge_null_safe
из пакета example
:
- Установите пакет
- Добавьте следующий макрос в ваш проект:
{% macro get_incremental_merge_null_safe_sql(arg_dict) %}
{% do return(example.get_incremental_merge_null_safe_sql(arg_dict)) %}
{% endmacro %}