О микробатчевых инкрементальных моделях
Используйте инкрементальные модели микробатчей для эффективной обработки больших наборов данных временных рядов.
Доступно для dbt «Latest» и dbt Core версии 1.9 и выше.
Если вы используете пользовательский макрос microbatch, установите флаг изменения поведения в вашем dbt_project.yml, чтобы включить пакетное выполнение. Если у вас нет пользовательского макроса microbatch, вам не нужно устанавливать этот флаг, так как dbt автоматически обработает микропакетирование для любой модели, использующей стратегию microbatch.
Прочитайте и примите участие в обсуждении: dbt Core#10672. Список поддерживаемых адаптеров см. в разделе Supported incremental strategies by adapter.
Что такое "microbatch" в dbt?
Инкрементальные модели в dbt — это материализация, предназначенная для эффективного обновления таблиц вашего хранилища данных путем трансформации и загрузки новых или измененных данных с момента последнего запуска. Вместо повторной обработки всего набора данных каждый раз, инкрементальные модели обрабатывают меньшее количество строк, а затем добавляют, обновляют или заменяют эти строки в существующей таблице. Это может значительно сократить время и ресурсы, необходимые для ваших преобразований данных.
Microbatch — это инкрементальная стратегия, предназначенная для больших наборов данных временных рядов:
- Она опирается исключительно на временной столбец (
event_time) для определения временных диапазонов при фильтрации. - Укажите столбец
event_timeдля вашей microbatch‑модели и её непосредственных родителей (upstream‑моделей). Обратите внимание: это отличается отpartition_by, который группирует строки в партиции.RequiredДля инкрементальных microbatch-моделей, если в ваших upstream-моделях не настроен параметр
event_time, dbt не сможет автоматически фильтровать их во время пакетной обработки и будет выполнять полное сканирование таблиц при каждом запуске batch.Чтобы избежать этого, настройте
event_timeдля каждой upstream-модели, которая должна фильтроваться. О том, как исключить модель из автоматической фильтрации, см. раздел opting out of auto-filtering. - Она дополняет, а не заменяет существующие инкрементальные стратегии, фокусируясь на эффективности и простоте пакетной обработки.
- В отличие от традиционных инкрементальных стратегий, microbatch позволяет повторно обрабатывать неудавшиеся батчи, автоматически определять параллельное выполнение батчей и устраняет необходимость реализовывать сложную условную логику для backfilling.
- Обратите внимание: microbatch может быть не лучшей стратегией для всех сценариев использования. Рассмотрите другие стратегии, если, например, у вас нет надёжного столбца
event_timeили если вам нужен больший контроль над инкрементальной логикой. Подробнее читайте в разделе Howmicrobatchcompares to other incremental strategies.
Как работает микробатч
Когда dbt запускает модель microbatch — будь то в первый раз, во время инкрементальных запусков или в указанных заполнениях пропусков — он разделит обработку на несколько запросов (или "пакетов"), основываясь на event_time и batch_size, которые вы настроили.
Каждый "пакет" соответствует одному ограниченному временному периоду (по умолчанию, один день данных). В то время как другие инкрементальные стратегии работают только с "старыми" и "новыми" данными, модели microbatch рассматривают каждый пакет как атомарную единицу, которую можно построить или заменить самостоятельно. Каждый пакет независим и idempotent.
Это мощная абстракция, которая позволяет dbt запускать пакеты отдельно, одновременно и повторно их независимо.
Поведение, зависящее от адаптера
Микропакетная стратегия dbt использует наиболее эффективный механизм, доступный для «полной пакетной» замены на каждом адаптере. Конкретная реализация может отличаться в зависимости от адаптера:
dbt-postgres: Использует стратегиюmerge, которая выполняет операции «update» или «insert».dbt-redshift: Использует стратегиюdelete+insert, которая выполняет «insert» или «replace».dbt-snowflake: Использует стратегиюdelete+insert, которая выполняет «insert» или «replace».dbt-bigquery: Использует стратегиюinsert_overwrite, которая выполняет «insert» или «replace».dbt-spark: Использует стратегиюinsert_overwrite, которая выполняет «insert» или «replace».dbt-databricks: Использует стратегиюreplace_where, которая выполняет «insert» или «replace».
Подробнее см. раздел supported incremental strategies by adapter.
Пример
Модель sessions агрегирует и обогащает данные, поступающие из двух других моделей:
page_views— это большая таблица временных рядов. Она содержит много строк, новые записи почти всегда поступают после существующих, и существующие записи редко обновляются. Она использует колонкуpage_view_startв качествеevent_time.customers— это относительно небольшая размерная таблица. Атрибуты клиентов часто обновляются и не в временном порядке — то есть, старые клиенты с такой же вероятностью могут изменить значения колонок, как и новые клиенты. Модель customers не настраивает колонкуevent_time.
В результате:
- Каждый пакет
sessionsбудет фильтроватьpage_viewsдо эквивалентного временно ограниченного пакета. - Таблица
customersне фильтруется, что приводит к полному сканированию для каждого пакета.
В дополнение к настройке event_time для целевой таблицы, вы также должны указать его для любых входящих моделей, которые вы хотите фильтровать, даже если у них разные временные колонки.
models:
- name: page_views
config:
event_time: page_view_start
Мы запускаем модель sessions для 1 октября 2024 года, а затем снова для 2 октября. Это приводит к следующим запросам:
- Model definition
- Compiled (Oct 1, 2024)
- Compiled (Oct 2, 2024)
event_time для модели sessions установлен на session_start, который отмечает начало сеанса пользователя на сайте. Эта настройка позволяет dbt объединять несколько просмотров страниц (каждый из которых отслеживается своими собственными временными метками page_view_start) в один сеанс. Таким образом, session_start различает время отдельных просмотров страниц от более широкого временного интервала всего сеанса пользователя.
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='session_start',
begin='2020-01-01',
batch_size='day'
) }}
with page_views as (
-- этот ref будет автоматически отфильтрован
select * from {{ ref('page_views') }}
),
customers as (
-- этот ref не будет
select * from {{ ref('customers') }}
)
select
page_views.id as session_id,
page_views.page_view_start as session_start,
customers.*
from page_views
left join customers
on page_views.customer_id = customers.id
with page_views as (
select * from (
-- отфильтровано по настроенному event_time
select * from "analytics"."page_views"
where page_view_start >= '2024-10-01 00:00:00' -- 1 октября
and page_view_start < '2024-10-02 00:00:00'
)
),
customers as (
select * from "analytics"."customers"
),
...
with page_views as (
select * from (
-- отфильтровано по настроенному event_time
select * from "analytics"."page_views"
where page_view_start >= '2024-10-02 00:00:00' -- 2 октября
and page_view_start < '2024-10-03 00:00:00'
)
),
customers as (
select * from "analytics"."customers"
),
...
dbt будет инструктировать платформу данных взять результат каждого пакетного запроса и вставить, обновить или заменить содержимое таблицы analytics.sessions за тот же день данных. Для выполнения этой операции dbt использует наиболее эффективный атомарный механизм «полной пакетной» замены, доступный на каждой платформе данных. Подробнее см. в разделе How microbatch works.
Не имеет значения, содержит ли таблица уже данные за этот день. При одинаковых входных данных результирующая таблица будет одинаковой, независимо от того, сколько раз пакет перепроцессируется.
Каждый пакет сеансов фильтрует page_views до соответствующего временно ограниченного пакета, но не фильтрует сеансы, выполняя полное сканирование для каждого пакета.Соответствующие конфигурации
Несколько конфигураций имеют отношение к моделям microbatch, и некоторые из них обязательны:
| Loading table... |
Обязательные конфигурации для конкретных адаптеров
Некоторые адаптеры требуют дополнительных конфигураций для стратегии microbatch. Это связано с тем, что каждый адаптер реализует стратегию microbatch по-разному.
Следующая таблица перечисляет обязательные конфигурации для конкретных адаптеров, в дополнение к стандартным конфигурациям microbatch:
| Loading table... |
Например, если вы используете dbt-postgres, настройте unique_key следующим образом:
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
unique_key='sales_id', ## обязательно для dbt-postgres
event_time='transaction_date',
begin='2023-01-01',
batch_size='day'
) }}
select
sales_id,
transaction_date,
customer_id,
product_id,
total_amount
from {{ source('sales', 'transactions') }}
В этом примере unique_key обязателен, потому что microbatch в dbt-postgres использует стратегию merge, которая требует unique_key для идентификации строк в хранилище данных, которые необходимо объединить. Без unique_key dbt не сможет сопоставить строки между входящим пакетом и существующей таблицей.
Полное обновление
В качестве лучшей практики мы рекомендуем настроить full_refresh: false для microbatch‑моделей, чтобы они игнорировали запуски с флагом --full-refresh.
Обратите внимание: запуск dbt run --full-refresh для microbatch‑модели сам по себе не приведёт к сбросу или перезагрузке данных, если вы также не укажете --event-time-start и --event-time-end. Без этих флагов dbt не знает, за какой временной диапазон нужно пересобрать данные. Для сброса данных используйте явные backfill‑запуски:
✅ Правильно:
dbt run --full-refresh --event-time-start "2024-01-01" --event-time-end "2024-02-01"
❌ Неправильно:
dbt run --full-refresh
Если вам нужно переобработать исторические данные, мы рекомендуем использовать целевой backfill с указанием --event-time-start и --event-time-end.
Использование
Вы должны написать запрос вашей модели для обработки (чтения и возврата) точно одного "пакета" данных. Это упрощающее предположение и мощное:
- Вам не нужно думать о фильтрации
is_incremental - Вам не нужно выбирать среди стратегий DML (вставка/объединение/замена)
- Вы можете предварительно просмотреть вашу модель и увидеть точные записи для данного пакета, которые появятся, когда этот пакет будет обработан и записан в таблицу
Когда вы запускаете модель microbatch, dbt оценит, какие пакеты нужно загрузить, разобьет их на SQL-запрос для каждого пакета и загрузит каждый независимо.
dbt автоматически будет фильтровать входные апстрим‑зависимости (source или ref), в которых определён event_time, на основе конфигураций lookback и batch_size для этой модели. Обратите внимание, что dbt не знает минимальное значение event_time в ваших данных — он использует только те конфигурации, которые вы укажете (например, begin, lookback), чтобы определить, какие батчи нужно запускать.
Если вы хотите обрабатывать данные, начиная с фактического начала вашего датасета, вы обязаны явно указать это с помощью конфигурации begin или флага --event-time-start.
Во время стандартных инкрементальных запусков dbt будет обрабатывать пакеты в соответствии с текущей временной меткой и настроенным lookback, с одним запросом на пакет.
Настройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусковОтказ от автофильтрации
Если существует вышестоящая модель, в которой настроен event_time, но вы не хотите, чтобы ссылка на неё автоматически фильтровалась, можно явно указать ref('upstream_model').render() и тем самым отказаться от автофильтрации. В целом это не рекомендуется — большинство моделей, в которых настраивается event_time, достаточно большие, и если ссылка на такую модель не будет отфильтрована, каждый батч будет выполнять полный скан входной таблицы.
Заполнения пропусков
Будь то исправление ошибочных исходных данных или ретроспективное применение изменения в бизнес-логике, вам может понадобиться перепроцессировать большое количество исторических данных.
Заполнение пропусков в модели microbatch так же просто, как выбрать ее для запуска или сборки и указать "начало" и "конец" для event_time. Обратите внимание, что --event-time-start и --event-time-end являются взаимно необходимыми, то есть, если вы указываете одно, вы должны указать другое.
Как всегда, dbt будет обрабатывать пакеты между началом и концом как независимые запросы.
dbt run --event-time-start "2024-09-01" --event-time-end "2024-09-04"
Настройте lookback, чтобы повторно обрабатывать дополнительные батчи во время стандартных инкрементальных запусковЕсли один или несколько ваших пакетов не удается, вы можете использовать dbt retry, чтобы перепроцессировать только неудачные пакеты.
Часовые пояса
На данный момент dbt предполагает, что все предоставленные значения находятся в UTC:
event_timebegin--event-time-start--event-time-end
Хотя мы можем рассмотреть возможность добавления поддержки пользовательских часовых поясов в будущем, мы также считаем, что определение этих значений в UTC упрощает жизнь всем.
Сравнение microbatch с другими инкрементальными стратегиями
По мере того, как хранилища данных внедряют новые операции для одновременной замены/объединения разделов данных, мы можем обнаружить, что новая операция для хранилища данных более эффективна, чем то, что адаптер использует для microbatch. В таких случаях мы оставляем за собой право обновить стандартную операцию для microbatch, при условии, что она работает как задумано/документировано для моделей, соответствующих парадигме microbatch.
Большинство инкрементальных моделей полагаются на конечного пользователя (вас), чтобы явно указать dbt, что означает "новый" в контексте каждой модели, написав фильтр в условном блоке {% if is_incremental() %}. Вы несете ответственность за создание этого SQL таким образом, чтобы он запрашивал {{ this }} для проверки, когда последняя запись была загружена, с необязательным окном обратного просмотра для записей, поступивших с опозданием.
Другие инкрементальные стратегии будут контролировать как данные добавляются в таблицу — будь то только добавление insert, delete + insert, merge, insert overwrite и т.д. — но у всех них есть это общее.
В качестве примера:
{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='date_day'
)
}}
select * from {{ ref('stg_events') }}
{% if is_incremental() %}
-- этот фильтр будет применен только при инкрементальном запуске
-- добавьте окно обратного просмотра в 3 дня, чтобы учесть записи, поступившие с опозданием
where date_day >= (select {{ dbt.dateadd("day", -3, "max(date_day)") }} from {{ this }})
{% endif %}
Для этой инкрементальной модели:
- "Новые" записи — это те, у которых
date_dayбольше, чем максимальныйdate_day, который был загружен ранее - Окно обратного просмотра составляет 3 дня
- Когда есть новые записи для данного
date_day, существующие данные дляdate_dayудаляются, и новые данные вставляются
Давайте возьмем наш предыдущий пример и вместо этого используем новую инкрементальную стратегию microbatch:
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_occured_at',
batch_size='day',
lookback=3,
begin='2020-01-01',
full_refresh=false
)
}}
select * from {{ ref('stg_events') }} -- этот ref будет автоматически отфильтрован
Где вы также установили event_time для прямых родителей модели - в данном случае, stg_events:
models:
- name: stg_events
config:
event_time: my_time_field
И это все!
Когда вы запускаете модель, каждый пакет шаблонирует отдельный запрос. Например, если вы запускали модель 1 октября, dbt шаблонировал бы отдельные запросы для каждого дня между 28 сентября и 1 октября включительно — всего четыре пакета.
Запрос для 2024-10-01 выглядел бы так:
select * from (
select * from "analytics"."stg_events"
where my_time_field >= '2024-10-01 00:00:00'
and my_time_field < '2024-10-02 00:00:00'
)
На основе вашей платформы данных dbt выберет наиболее эффективный атомарный механизм для вставки, обновления или замены этих четырех пакетов (2024-09-28, 2024-09-29, 2024-09-30 и 2024-10-01) в существующей таблице.
