Перейти к основному содержимому

Настройка инкрементальных моделей

Инкрементальные модели создаются в виде таблиц в вашем . При первом запуске модели создается путем трансформации всех строк исходных данных. При последующих запусках dbt трансформирует только те строки в ваших исходных данных, которые вы указали dbt отфильтровать, вставляя их в целевую таблицу, которая уже была создана.

Часто строки, которые вы фильтруете при инкрементальном запуске, будут строками в ваших исходных данных, которые были созданы или обновлены с момента последнего запуска dbt. Таким образом, при каждом запуске dbt ваша модель строится инкрементально.

Использование инкрементальной модели ограничивает объем данных, которые необходимо трансформировать, значительно сокращая время выполнения ваших трансформаций. Это улучшает производительность хранилища и снижает затраты на вычисления.

Настройка инкрементальной материализации

Как и другие материализации, встроенные в dbt, инкрементальные модели определяются с помощью select-запросов, с материализацией, определенной в блоке конфигурации.

{{
config(
materialized='incremental'
)
}}

select ...

Чтобы использовать инкрементальные модели, вам также нужно указать dbt:

  • Как фильтровать строки при инкрементальном запуске
  • Уникальный ключ модели (если он есть)

Понимание макроса is_incremental()

Макрос is_incremental() управляет инкрементальной материализацией. Он вернет True, если выполнены все следующие условия:

  • Модель уже должна существовать в базе данных
  • Целевая таблица уже существует в базе данных
  • Флаг full-refresh не передан
  • Запускаемая модель настроена с materialized='incremental'

Обратите внимание, что SQL в вашей модели должен быть действительным, независимо от того, оценивается ли is_incremental() как True или False.

Фильтрация строк при инкрементальном запуске

Чтобы указать dbt, какие строки он должен трансформировать при инкрементальном запуске, оберните действительный SQL, который фильтрует эти строки, в макрос is_incremental().

Часто вы захотите фильтровать "новые" строки, то есть строки, которые были созданы с момента последнего запуска dbt этой модели. Лучший способ найти временную метку последнего запуска этой модели — это проверить самую последнюю временную метку в вашей целевой таблице. dbt упрощает запрос вашей целевой таблицы, используя переменную "{{ this }}".

Также часто требуется захватить как новые, так и обновленные записи. Для обновленных записей вам нужно будет определить уникальный ключ, чтобы гарантировать, что вы не добавите измененные записи как дубликаты. Ваш код is_incremental() будет проверять строки, созданные или измененные с момента последнего запуска dbt этой модели.

Например, модель, включающая вычислительно медленную трансформацию в столбце, может быть построена инкрементально следующим образом:

models/stg_events.sql
{{
config(
materialized='incremental'
)
}}

select
*,
my_slow_function(my_column)

from {{ ref('app_data_events') }}

{% if is_incremental() %}

-- этот фильтр будет применяться только при инкрементальном запуске
-- (использует >= для включения записей, временная метка которых произошла с момента последнего запуска этой модели)
-- (Если event_time равно NULL или таблица обрезана, условие всегда будет истинным и загрузит все записи)
where event_time >= (select coalesce(max(event_time),'1900-01-01') from {{ this }} )

{% endif %}
Оптимизация вашей инкрементальной модели

Для более сложных инкрементальных моделей, использующих Общие Табличные Выражения (CTE), следует учитывать влияние позиции макроса is_incremental() на производительность запроса. В некоторых хранилищах фильтрация записей на раннем этапе может значительно улучшить время выполнения вашего запроса!

Определение уникального ключа (опционально)

unique_key позволяет обновлять существующие строки вместо простого добавления новых строк. Если новая информация поступает для существующего unique_key, эта новая информация может заменить текущую информацию вместо добавления в таблицу. Если поступает дублирующаяся строка, она может быть проигнорирована. Обратитесь к конфигурациям, специфичным для стратегии для получения дополнительных опций управления этим поведением обновления, например, выбора только определенных столбцов для обновления.

Не указание unique_key приведет к поведению только добавления, что означает, что dbt вставляет все строки, возвращенные SQL модели, в уже существующую целевую таблицу, не учитывая, представляют ли строки дубликаты.

Опциональный параметр unique_key указывает поле (или комбинацию полей), которое определяет зерно вашей модели. То есть, поле(я) идентифицируют одну уникальную строку. Вы можете определить unique_key в блоке конфигурации в начале вашей модели, и это может быть одно имя столбца или список имен столбцов.

unique_key должен быть указан в определении вашей модели в виде строки, представляющей один столбец, или списка имен столбцов в одинарных кавычках, которые могут использоваться вместе, например, ['col1', 'col2', …]). Столбцы, используемые таким образом, не должны содержать null-значений, иначе инкрементальная модель может не сопоставить строки и сгенерировать дубликаты. Либо убедитесь, что каждый столбец не содержит null-значений (например, с помощью coalesce(COLUMN_NAME, 'VALUE_IF_NULL')), либо определите одно-столбцовый суррогатный ключ (например, с помощью dbt_utils.generate_surrogate_key).

подсказка

В случаях, когда вам нужно несколько столбцов в комбинации для уникальной идентификации каждой строки, мы рекомендуем передавать эти столбцы в виде списка (unique_key = ['user_id', 'session_number']), а не строкового выражения (unique_key = 'concat(user_id, session_number)').

Используя первый синтаксис, который более универсален, dbt может гарантировать, что столбцы будут шаблонизированы в вашей инкрементальной материализации модели таким образом, который подходит для вашей базы данных.

Когда вы передаете список таким образом, пожалуйста, убедитесь, что каждый столбец не содержит null-значений, иначе инкрементальный запуск модели может не удаться.

В качестве альтернативы, вы можете определить одно-столбцовый суррогатный ключ, например, с помощью dbt_utils.generate_surrogate_key.

Когда вы определяете unique_key, вы увидите следующее поведение для каждой строки "новых" данных, возвращаемых вашей моделью dbt:

  • Если тот же unique_key присутствует в "новых" и "старых" данных модели, dbt обновит/заменит старую строку новой строкой данных. Точные механизмы того, как это обновление/замена происходит, будут варьироваться в зависимости от вашей базы данных, инкрементальной стратегии и конфигураций, специфичных для стратегии.
  • Если unique_key не присутствует в "старых" данных, dbt вставит всю строку в таблицу.

Обратите внимание, что если существует уникальный ключ с более чем одной строкой как в существующей целевой таблице, так и в новых инкрементальных строках, инкрементальная модель может не удаться в зависимости от вашей базы данных и инкрементальной стратегии. Если у вас возникают проблемы с запуском инкрементальной модели, рекомендуется дважды проверить, что уникальный ключ действительно уникален как в вашей существующей таблице базы данных, так и в ваших новых инкрементальных строках. Вы можете узнать больше о суррогатных ключах здесь.

к сведению

Хотя общие инкрементальные стратегии, такие как delete+insert + merge, могут использовать unique_key, другие не используют. Например, стратегия insert_overwrite не использует unique_key, так как она работает с разделами данных, а не с отдельными строками. Для получения дополнительной информации см. О стратегии incremental_strategy.

Пример unique_key

Рассмотрим модель, которая вычисляет количество активных пользователей в день (DAU) на основе потока событий. По мере поступления исходных данных вы захотите пересчитать количество DAU как за день, когда dbt последний раз запускался, так и за все дни с тех пор. Модель будет выглядеть следующим образом:

models/staging/fct_daily_active_users.sql
{{
config(
materialized='incremental',
unique_key='date_day'
)
}}

select
date_trunc('day', event_at) as date_day,
count(distinct user_id) as daily_active_users

from {{ ref('app_data_events') }}


{% if is_incremental() %}

-- этот фильтр будет применяться только при инкрементальном запуске
-- (использует >= для включения записей, поступающих позже в тот же день, когда последний раз запускалась эта модель)
where date_day >= (select coalesce(max(date_day), '1900-01-01') from {{ this }})

{% endif %}

group by 1

Построение этой модели инкрементально без параметра unique_key приведет к множеству строк в целевой таблице за один день – по одной строке за каждый раз, когда dbt запускается в этот день. Вместо этого включение параметра unique_key гарантирует, что существующая строка будет обновлена.

Как перестроить инкрементальную модель?

Если логика вашей инкрементальной модели изменилась, трансформации ваших новых строк данных могут отличаться от исторических трансформаций, которые хранятся в вашей целевой таблице. В этом случае вам следует перестроить вашу инкрементальную модель.

Чтобы заставить dbt перестроить всю инкрементальную модель с нуля, используйте флаг --full-refresh в командной строке. Этот флаг заставит dbt удалить существующую целевую таблицу в базе данных перед ее перестроением за все время.

$ dbt run --full-refresh --select my_incremental_model+

Также рекомендуется перестроить любые модели, зависящие от этой, как указано в завершающем +.

Вы можете опционально использовать full_refresh config, чтобы установить ресурс для всегда или никогда полного обновления на уровне проекта или ресурса. Если указано как true или false, конфигурация full_refresh будет иметь приоритет над наличием или отсутствием флага --full-refresh.

Для получения подробных инструкций по использованию ознакомьтесь с документацией dbt run.

Что делать, если столбцы моей инкрементальной модели изменяются?

Инкрементальные модели могут быть настроены для включения опционального параметра on_schema_change, чтобы обеспечить дополнительный контроль при изменении столбцов инкрементальной модели. Эти опции позволяют dbt продолжать запускать инкрементальные модели при наличии изменений в схеме, что приводит к меньшему количеству сценариев --full-refresh и экономии затрат на запросы.

Вы можете настроить параметр on_schema_change следующим образом.

dbt_project.yml
models:
+on_schema_change: "sync_all_columns"
models/staging/fct_daily_active_users.sql
{{
config(
materialized='incremental',
unique_key='date_day',
on_schema_change='fail'
)
}}

Возможные значения для on_schema_change:

  • ignore: Поведение по умолчанию (см. ниже).
  • fail: Вызывает сообщение об ошибке, когда исходная и целевая схемы расходятся.
  • append_new_columns: Добавляет новые столбцы в существующую таблицу. Обратите внимание, что эта настройка не удаляет столбцы из существующей таблицы, которые отсутствуют в новых данных.
  • sync_all_columns: Добавляет любые новые столбцы в существующую таблицу и удаляет любые столбцы, которые теперь отсутствуют. Обратите внимание, что это включает изменения типов данных. На BigQuery изменение типов столбцов требует полного сканирования ; будьте внимательны к компромиссам при реализации.

Примечание: Ни одно из поведений on_schema_change не заполняет значения в старых записях для вновь добавленных столбцов. Если вам нужно заполнить эти значения, мы рекомендуем выполнять ручные обновления или запускать --full-refresh.

on_schema_change отслеживает изменения на верхнем уровне

В настоящее время on_schema_change отслеживает только изменения на верхнем уровне столбцов. Он не отслеживает изменения вложенных столбцов. Например, в BigQuery добавление, удаление или изменение вложенного столбца не вызовет изменения схемы, даже если on_schema_change настроен соответствующим образом.

Поведение по умолчанию

Это поведение on_schema_change: ignore, которое установлено по умолчанию.

Если вы добавите столбец в вашу инкрементальную модель и выполните dbt run, этот столбец не появится в вашей целевой таблице.

Если вы удалите столбец из вашей инкрементальной модели и выполните dbt run, dbt run завершится с ошибкой.

Вместо этого, всякий раз, когда логика вашей инкрементальной модели изменяется, выполните полный обновляющий запуск как вашей инкрементальной модели, так и любых зависимых моделей.

Вопросы от сообщества

0