Настройка инкрементальных моделей
Узнайте, как настраивать и оптимизировать инкрементальные модели при разработке в dbt.
Инкрементальные модели создаются в виде таблиц в вашем data warehouse. При первом запуске модели table создается путем трансформации всех строк исходных данных. При последующих запусках dbt трансформирует только те строки в ваших исходных данных, которые вы указали dbt отфильтровать, вставляя их в целевую таблицу, которая уже была создана.
Часто строки, которые вы фильтруете при инкрементальном запуске, будут строками в ваших исходных данных, которые были созданы или обновлены с момента последнего запуска dbt. Таким образом, при каждом запуске dbt ваша модель строится инкрементально.
Использование инкрементальной модели ограничивает объем данных, которые необходимо трансформировать, значительно сокращая время выполнения ваших трансформаций. Это улучшает производительность хранилища и снижает затраты на вычисления.
Настройка инкрементальной материализации
Как и другие материализации, встроенные в dbt, инкрементальные модели определяются с помощью select-запросов, с материализацией, определенной в блоке конфигурации.
{{
config(
materialized='incremental'
)
}}
select ...
Чтобы использовать инкрементальные модели, вам также нужно указать dbt:
- Как фильтровать строки при инкрементальном запуске
- Уникальный ключ модели (если он есть)
Понимание макроса is_incremental()
Макрос is_incremental() управляет инкрементальной материализацией. Он вернет True, если выполнены все следующие условия:
- Модель уже должна существовать в базе данных в виде таблицы
- Флаг
full-refreshне передан - Запускаемая модель настроена с
materialized='incremental'
Обратите внимание, что SQL в вашей модели должен быть действительным, независимо от того, оценивается ли is_incremental() как True или False.
Фильтрация строк при инкрементальном запуске
Чтобы указать dbt, какие строки он должен трансформировать при инкрементальном запуске, оберните действительный SQL, который фильтрует эти строки, в макрос is_incremental().
Часто вы захотите фильтровать "новые" строки, то есть строки, которые были созданы с момента последнего запуска dbt этой модели. Лучший способ найти временную метку последнего запуска этой модели — это проверить самую последнюю временную метку в вашей целевой таблице. dbt упрощает запрос вашей целевой таблицы, используя переменную "{{ this }}".
Также часто требуется захватить как новые, так и обновленные записи. Для обновленных записей вам нужно будет определить уникальный ключ, чтобы гарантировать, что вы не добавите измененные записи как дубликаты. Ваш код is_incremental() будет проверять строки, созданные или измененные с момента последнего запуска dbt этой модели.
Например, модель, включающая вычислительно медленную трансформацию в столбце, может быть построена инкрементально следующим образом:
{{
config(
materialized='incremental'
)
}}
select
*,
my_slow_function(my_column)
from {{ ref('app_data_events') }}
{% if is_incremental() %}
-- этот фильтр будет применяться только при инкрементальном запуске
-- (использует >= для включения записей, временная метка которых произошла с момента последнего запуска этой модели)
-- (Если event_time равно NULL или таблица обрезана, условие всегда будет истинным и загрузит все записи)
where event_time >= (select coalesce(max(event_time),'1900-01-01') from {{ this }} )
{% endif %}
Для более сложных инкрементальных моделей, использующих Общие Табличные Выражения (CTE), следует учитывать влияние позиции макроса is_incremental() на производительность запроса. В некоторых хранилищах фильтрация записей на раннем этапе может значительно улучшить время выполнения вашего запроса!
Об incremental_predicates
incremental_predicates — это продвинутое использование инкрементальных моделей, применяемое в случаях, когда объём данных достаточно велик, чтобы оправдать дополнительные инвестиции в производительность. Этот параметр конфигурации принимает список любых допустимых SQL-выражений. dbt не проверяет синтаксис SQL-выражений, указанных здесь.
Ниже приведён пример конфигурации модели в yml-файле, который можно встретить при работе со Snowflake:
models:
- name: my_incremental_model
config:
materialized: incremental
unique_key: id
# this will affect how the data is stored on disk, and indexed to limit scans
cluster_by: ['session_start']
incremental_strategy: merge
# this limits the scan of the existing table to the last 7 days of data
incremental_predicates: ["DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"]
# `incremental_predicates` accepts a list of SQL statements.
# `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` are the standard aliases for the target table and temporary table, respectively, during an incremental run using the merge strategy.
В качестве альтернативы, те же самые параметры могут быть заданы непосредственно в файле модели:
-- in models/my_incremental_model.sql
{{
config(
materialized = 'incremental',
unique_key = 'id',
cluster_by = ['session_start'],
incremental_strategy = 'merge',
incremental_predicates = [
"DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"
]
)
}}
...
В результате dbt сгенерирует (в файле dbt.log) SQL-выражение merge, выглядящее примерно так:
merge into <existing_table> DBT_INTERNAL_DEST
from <temp_table_with_new_records> DBT_INTERNAL_SOURCE
on
-- unique key
DBT_INTERNAL_DEST.id = DBT_INTERNAL_SOURCE.id
and
-- custom predicate: limits data scan in the "old" data / existing table
DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)
when matched then update ...
when not matched then insert ...
Чтобы ограничить сканирование данных в вышестоящих (upstream) таблицах, используйте условия непосредственно в SQL-коде инкрементальной модели. Это позволит сократить объём «новых» данных, которые необходимо обрабатывать и трансформировать.
with large_source_table as (
select * from {{ ref('large_source_table') }}
{% if is_incremental() %}
where session_start >= dateadd(day, -3, current_date)
{% endif %}
),
...
Определение уникального ключа
Определение необязательного параметра unique_key позволяет обновлять существующие строки, а не просто добавлять новые. Если для уже существующего unique_key поступает новая информация, она может заменить текущие данные вместо того, чтобы быть добавленной в таблицу. Если приходит дублирующаяся строка, она может быть проигнорирована. Подробнее о дополнительных возможностях управления этим поведением обновления (например, выборе конкретных столбцов для обновления) см. в разделе strategy specific configs.
Если вы не указываете unique_key, большинство адаптеров будут работать в режиме append-only. Это означает, что dbt вставляет все строки, возвращаемые SQL-запросом модели, в уже существующую целевую таблицу, не проверяя, являются ли эти строки дубликатами.
Опциональный параметр unique_key указывает поле (или комбинацию полей), которое определяет зерно вашей модели. То есть, поле(я) идентифицируют одну уникальную строку. Вы можете определить unique_key в блоке конфигурации в начале вашей модели, и это может быть одно имя столбца или список имен столбцов.
unique_key должен быть указан в определении вашей модели в виде строки, представляющей один столбец, или списка имен столбцов в одинарных кавычках, которые могут использоваться вместе, например, ['col1', 'col2', …]). Столбцы, используемые таким образом, не должны содержать null-значений, иначе инкрементальная модель может не сопоставить строки и сгенерировать дубликаты. Либо убедитесь, что каждый столбец не содержит null-значений (например, с помощью coalesce(COLUMN_NAME, 'VALUE_IF_NULL')), либо определите одно-столбцовый суррогатный ключ (например, с помощью dbt_utils.generate_surrogate_key).
В случаях, когда вам нужно несколько столбцов в комбинации для уникальной идентификации каждой строки, мы рекомендуем передавать эти столбцы в виде списка (unique_key = ['user_id', 'session_number']), а не строкового выражения (unique_key = 'concat(user_id, session_number)').
Используя первый синтаксис, который более универсален, dbt может гарантировать, что столбцы будут шаблонизированы в вашей инкрементальной материализации модели таким образом, который подходит для вашей базы данных.
Когда вы передаете список таким образом, пожалуйста, убедитесь, что каждый столбец не содержит null-значений, иначе инкрементальный запуск модели может не удаться.
В качестве альтернативы, вы можете определить одно-столбцовый суррогатный ключ, например, с помощью dbt_utils.generate_surrogate_key.
Когда вы определяете unique_key, вы увидите следующее поведение для каждой строки "новых" данных, возвращаемых вашей моделью dbt:
- Если один и тот же
unique_keyприсутствует и в «новых», и в «старых» данных модели, dbt обновит или заменит старую строку новой строкой данных. Точный механизм того, как именно происходит это обновление/замена, зависит от используемой базы данных, incremental strategy и strategy specific configs. - Если
unique_keyне присутствует в «старых» данных, dbt вставит всю строку целиком в таблицу.
Обратите внимание, что если существует уникальный ключ с более чем одной строкой как в существующей целевой таблице, так и в новых инкрементальных строках, инкрементальная модель может не удаться в зависимости от вашей базы данных и инкрементальной стратегии. Если у вас возникают проблемы с запуском инкрементальной модели, рекомендуется дважды проверить, что уникальный ключ действительно уникален как в вашей существующей таблице базы данных, так и в ваших новых инкрементальных строках. Вы можете узнать больше о суррогатных ключах здесь.
Хотя общие инкрементальные стратегии, такие как delete+insert + merge, могут использовать unique_key, другие не используют. Например, стратегия insert_overwrite не использует unique_key, так как она работает с разделами данных, а не с отдельными строками. Для получения дополнительной информации см. О стратегии incremental_strategy.
Пример unique_key
Рассмотрим модель, которая вычисляет количество активных пользователей в день (DAU) на основе потока событий. По мере поступления исходных данных вы захотите пересчитать количество DAU как за день, когда dbt последний раз запускался, так и за все дни с тех пор. Модель будет выглядеть следующим образом:
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}
select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users
from {{ ref('app_data_events') }}
{% if is_incremental() %}
-- этот фильтр будет применяться только при инкрементальном запуске
-- (использует >= для включения записей, поступающих позже в тот же день, когда последний раз запускалась эта модель)
where date_day >= (select coalesce(max(date_day), '1900-01-01') from {{ this }})
{% endif %}
group by 1
Построение этой модели инкрементально без параметра unique_key приведет к множеству строк в целевой таблице за один день – по одной строке за каждый раз, когда dbt запускается в этот день. Вместо этого включение параметра unique_key гарантирует, что существующая строка будет обновлена.
Как перестроить инкрементальную модель?
Если логика вашей инкрементальной модели изменилась, трансформации ваших новых строк данных могут отличаться от исторических трансформаций, которые хранятся в вашей целевой таблице. В этом случае вам следует перестроить вашу инкрементальную модель.
Чтобы заставить dbt перестроить всю инкрементальную модель с нуля, используйте флаг --full-refresh в командной строке. Этот флаг заставит dbt удалить существующую целевую таблицу в базе данных перед ее перестроением за все время.
$ dbt run --full-refresh --select my_incremental_model+
Завершающий символ + в приведённой выше команде также запустит все downstream‑модели, которые зависят от my_incremental_model. Если какие‑либо из этих downstream‑зависимостей тоже являются инкрементальными моделями, они также будут полностью пересобраны (fully refreshed).
Вы можете опционально использовать full_refresh config, чтобы установить ресурс для всегда или никогда полного обновления на уровне проекта или ресурса. Если указано как true или false, конфигурация full_refresh будет иметь приоритет над наличием или отсутствием флага --full-refresh.
Для получения подробных инструкций по использованию ознакомьтесь с документацией dbt run.
Что делать, если столбцы моей инкрементальной модели изменяются?
Инкрементальные модели могут быть настроены для включения опционального параметра on_schema_change, чтобы обеспечить дополнительный контроль при изменении столбцов инкрементальной модели. Эти опции позволяют dbt продолжать запускать инкрементальные модели при наличии изменений в схеме, что приводит к меньшему количеству сценариев --full-refresh и экономии затрат на запросы.
Вы можете настроить параметр on_schema_change следующим образом.
models:
+on_schema_change: "sync_all_columns"
{{
config(
materialized='incremental',
unique_key='date_day',
on_schema_change='fail'
)
}}
Возможные значения для on_schema_change:
ignore: Поведение по умолчанию (см. ниже).fail: Вызывает ошибку, когда схемы источника и целевой таблицы расходятся.append_new_columns: Добавляет новые колонки в существующую таблицу. Обратите внимание, что этот параметр не удаляет колонки из существующей таблицы, если их нет в новых данных.sync_all_columns: Добавляет все новые колонки в существующую таблицу и удаляет колонки, которые теперь отсутствуют. Обратите внимание, что это поведение включает изменения типов данных. В BigQuery изменение типов колонок требует полного сканирования table; учитывайте связанные с этим компромиссы при реализации.
Примечание: Ни одно из поведений on_schema_change не заполняет значения в старых записях для вновь добавленных столбцов. Если вам нужно заполнить эти значения, мы рекомендуем выполнять ручные обновления или запускать --full-refresh.
on_schema_change отслеживает изменения на верхнем уровнеВ настоящее время on_schema_change отслеживает только изменения на верхнем уровне столбцов. Он не отслеживает изменения вложенных столбцов. Например, в BigQuery добавление, удаление или изменение вложенного столбца не вызовет изменения схемы, даже если on_schema_change настроен соответствующим образом.
Поведение по умолчанию
Это поведение on_schema_change: ignore, которое установлено по умолчанию.
Если вы добавите столбец в вашу инкрементальную модель и выполните dbt run, этот столбец не появится в вашей целевой таблице.
Если вы удалите столбец из вашей инкрементальной модели и выполните dbt run, dbt run завершится с ошибкой.
Вместо этого, всякий раз, когда логика вашей инкрементальной модели изменяется, выполните полный обновляющий запуск как вашей инкрементальной модели, так и любых зависимых моделей.