Перейти к основному содержимому

О микробатчевых инкрементальных моделях

Используйте инкрементальные модели микробатчей для эффективной обработки больших наборов данных временных рядов.

к сведению

Доступно для dbt «Latest» и dbt Core версии 1.9 и выше.

Если вы используете пользовательский макрос microbatch, установите флаг изменения поведения в вашем dbt_project.yml, чтобы включить пакетное выполнение. Если у вас нет пользовательского макроса microbatch, вам не нужно устанавливать этот флаг, так как dbt автоматически обработает микропакетирование для любой модели, использующей стратегию microbatch.

Прочитайте и примите участие в обсуждении: dbt Core#10672. Список поддерживаемых адаптеров см. в разделе Supported incremental strategies by adapter.

Что такое "microbatch" в dbt?

Инкрементальные модели в dbt — это материализация, предназначенная для эффективного обновления таблиц вашего хранилища данных путем трансформации и загрузки новых или измененных данных с момента последнего запуска. Вместо повторной обработки всего набора данных каждый раз, инкрементальные модели обрабатывают меньшее количество строк, а затем добавляют, обновляют или заменяют эти строки в существующей таблице. Это может значительно сократить время и ресурсы, необходимые для ваших преобразований данных.

Microbatch — это инкрементальная стратегия, предназначенная для больших наборов данных временных рядов:

  • Она опирается исключительно на временной столбец (event_time) для определения временных диапазонов при фильтрации.
  • Укажите столбец event_time для вашей microbatch‑модели и её непосредственных родителей (upstream‑моделей). Обратите внимание: это отличается от partition_by, который группирует строки в партиции.
    Required

    Для инкрементальных microbatch-моделей, если в ваших upstream-моделях не настроен параметр event_time, dbt не сможет автоматически фильтровать их во время пакетной обработки и будет выполнять полное сканирование таблиц при каждом запуске batch.

    Чтобы избежать этого, настройте event_time для каждой upstream-модели, которая должна фильтроваться. О том, как исключить модель из автоматической фильтрации, см. раздел opting out of auto-filtering.

  • Она дополняет, а не заменяет существующие инкрементальные стратегии, фокусируясь на эффективности и простоте пакетной обработки.
  • В отличие от традиционных инкрементальных стратегий, microbatch позволяет повторно обрабатывать неудавшиеся батчи, автоматически определять параллельное выполнение батчей и устраняет необходимость реализовывать сложную условную логику для backfilling.
  • Обратите внимание: microbatch может быть не лучшей стратегией для всех сценариев использования. Рассмотрите другие стратегии, если, например, у вас нет надёжного столбца event_time или если вам нужен больший контроль над инкрементальной логикой. Подробнее читайте в разделе How microbatch compares to other incremental strategies.

Как работает микробатч

Когда dbt запускает модель microbatch — будь то в первый раз, во время инкрементальных запусков или в указанных заполнениях пропусков — он разделит обработку на несколько запросов (или "пакетов"), основываясь на event_time и batch_size, которые вы настроили.

Каждый "пакет" соответствует одному ограниченному временному периоду (по умолчанию, один день данных). В то время как другие инкрементальные стратегии работают только с "старыми" и "новыми" данными, модели microbatch рассматривают каждый пакет как атомарную единицу, которую можно построить или заменить самостоятельно. Каждый пакет независим и idempotent.

Это мощная абстракция, которая позволяет dbt запускать пакеты отдельно, одновременно и повторно их независимо.

Поведение, зависящее от адаптера

Микропакетная стратегия dbt использует наиболее эффективный механизм, доступный для «полной пакетной» замены на каждом адаптере. Конкретная реализация может отличаться в зависимости от адаптера:

  • dbt-postgres: Использует стратегию merge, которая выполняет операции «update» или «insert».
  • dbt-redshift: Использует стратегию delete+insert, которая выполняет «insert» или «replace».
  • dbt-snowflake: Использует стратегию delete+insert, которая выполняет «insert» или «replace».
  • dbt-bigquery: Использует стратегию insert_overwrite, которая выполняет «insert» или «replace».
  • dbt-spark: Использует стратегию insert_overwrite, которая выполняет «insert» или «replace».
  • dbt-databricks: Использует стратегию replace_where, которая выполняет «insert» или «replace».

Подробнее см. раздел supported incremental strategies by adapter.

Пример

Модель sessions агрегирует и обогащает данные, поступающие из двух других моделей:

  • page_views — это большая таблица временных рядов. Она содержит много строк, новые записи почти всегда поступают после существующих, и существующие записи редко обновляются. Она использует колонку page_view_start в качестве event_time.
  • customers — это относительно небольшая размерная таблица. Атрибуты клиентов часто обновляются и не в временном порядке — то есть, старые клиенты с такой же вероятностью могут изменить значения колонок, как и новые клиенты. Модель customers не настраивает колонку event_time.

В результате:

  • Каждый пакет sessions будет фильтровать page_views до эквивалентного временно ограниченного пакета.
  • Таблица customers не фильтруется, что приводит к полному сканированию для каждого пакета.
подсказка

В дополнение к настройке event_time для целевой таблицы, вы также должны указать его для любых входящих моделей, которые вы хотите фильтровать, даже если у них разные временные колонки.

models/staging/page_views.yml
models:
- name: page_views
config:
event_time: page_view_start

Мы запускаем модель sessions для 1 октября 2024 года, а затем снова для 2 октября. Это приводит к следующим запросам:

event_time для модели sessions установлен на session_start, который отмечает начало сеанса пользователя на сайте. Эта настройка позволяет dbt объединять несколько просмотров страниц (каждый из которых отслеживается своими собственными временными метками page_view_start) в один сеанс. Таким образом, session_start различает время отдельных просмотров страниц от более широкого временного интервала всего сеанса пользователя.

models/sessions.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='session_start',
begin='2020-01-01',
batch_size='day'
) }}

with page_views as (

-- этот ref будет автоматически отфильтрован
select * from {{ ref('page_views') }}

),

customers as (

-- этот ref не будет
select * from {{ ref('customers') }}

)

select
page_views.id as session_id,
page_views.page_view_start as session_start,
customers.*
from page_views
left join customers
on page_views.customer_id = customers.id

dbt будет инструктировать платформу данных взять результат каждого пакетного запроса и вставить, обновить или заменить содержимое таблицы analytics.sessions за тот же день данных. Для выполнения этой операции dbt использует наиболее эффективный атомарный механизм «полной пакетной» замены, доступный на каждой платформе данных. Подробнее см. в разделе How microbatch works.

Не имеет значения, содержит ли таблица уже данные за этот день. При одинаковых входных данных результирующая таблица будет одинаковой, независимо от того, сколько раз пакет перепроцессируется.

Каждый пакет сеансов фильтрует page_views до соответствующего временно ограниченного пакета, но не фильтрует сеансы, выполняя полное сканирование для каждого пакета.Каждый пакет сеансов фильтрует page_views до соответствующего временно ограниченного пакета, но не фильтрует сеансы, выполняя полное сканирование для каждого пакета.

Соответствующие конфигурации

Несколько конфигураций имеют отношение к моделям microbatch, и некоторые из них обязательны:

КонфигурацияОписаниеЗначение по умолчаниюТипОбязательно
event_timeКолонка, указывающая "в какое время произошла строка". Обязательно для вашей модели microbatch и любых прямых родителей, которые должны быть отфильтрованы.N/AКолонкаОбязательно
begin"Начало времени" для модели microbatch. Это отправная точка для любых начальных или полных обновлений. Например, модель microbatch с дневной зернистостью, запущенная 1 октября 2024 года с begin = '2023-10-01, обработает 366 пакетов (это високосный год!) плюс пакет для "сегодня".N/AДатаОбязательно
batch_sizeГранулярность ваших пакетов. Поддерживаемые значения: hour, day, month, и yearN/AСтрокаОбязательно
lookbackОбработать X пакетов перед последней закладкой, чтобы захватить записи, поступившие с опозданием.1Целое числоНеобязательно
concurrent_batchesПереопределяет автоматическое обнаружение dbt для выполнения пакетов одновременно (в одно и то же время). Подробнее о настройке параллельных пакетов. Установка на
* true выполняет пакеты одновременно (параллельно).
* false выполняет пакеты последовательно (один за другим).
NoneЛогическоеНеобязательно
Loading table...
Конфигурация колонки event_time настраивает реальное время этой записиКонфигурация колонки event_time настраивает реальное время этой записи

Обязательные конфигурации для конкретных адаптеров

Некоторые адаптеры требуют дополнительных конфигураций для стратегии microbatch. Это связано с тем, что каждый адаптер реализует стратегию microbatch по-разному.

Следующая таблица перечисляет обязательные конфигурации для конкретных адаптеров, в дополнение к стандартным конфигурациям microbatch:

АдаптерКонфигурация unique_keyКонфигурация partition_by
dbt-postgres✅ ОбязательноN/A
dbt-sparkN/A✅ Обязательно
dbt-bigqueryN/A✅ Обязательно
Loading table...

Например, если вы используете dbt-postgres, настройте unique_key следующим образом:

models/sessions.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
unique_key='sales_id', ## обязательно для dbt-postgres
event_time='transaction_date',
begin='2023-01-01',
batch_size='day'
) }}

select
sales_id,
transaction_date,
customer_id,
product_id,
total_amount
from {{ source('sales', 'transactions') }}

В этом примере unique_key обязателен, потому что microbatch в dbt-postgres использует стратегию merge, которая требует unique_key для идентификации строк в хранилище данных, которые необходимо объединить. Без unique_key dbt не сможет сопоставить строки между входящим пакетом и существующей таблицей.

Полное обновление

В качестве лучшей практики мы рекомендуем настроить full_refresh: false для microbatch‑моделей, чтобы они игнорировали запуски с флагом --full-refresh.

Обратите внимание: запуск dbt run --full-refresh для microbatch‑модели сам по себе не приведёт к сбросу или перезагрузке данных, если вы также не укажете --event-time-start и --event-time-end. Без этих флагов dbt не знает, за какой временной диапазон нужно пересобрать данные. Для сброса данных используйте явные backfill‑запуски:

✅ Правильно:

dbt run --full-refresh --event-time-start "2024-01-01" --event-time-end "2024-02-01"

❌ Неправильно:

dbt run --full-refresh

Если вам нужно переобработать исторические данные, мы рекомендуем использовать целевой backfill с указанием --event-time-start и --event-time-end.

Использование

Вы должны написать запрос вашей модели для обработки (чтения и возврата) точно одного "пакета" данных. Это упрощающее предположение и мощное:

  • Вам не нужно думать о фильтрации is_incremental
  • Вам не нужно выбирать среди стратегий DML (вставка/объединение/замена)
  • Вы можете предварительно просмотреть вашу модель и увидеть точные записи для данного пакета, которые появятся, когда этот пакет будет обработан и записан в таблицу

Когда вы запускаете модель microbatch, dbt оценит, какие пакеты нужно загрузить, разобьет их на SQL-запрос для каждого пакета и загрузит каждый независимо.

dbt автоматически будет фильтровать входные апстрим‑зависимости (source или ref), в которых определён event_time, на основе конфигураций lookback и batch_size для этой модели. Обратите внимание, что dbt не знает минимальное значение event_time в ваших данных — он использует только те конфигурации, которые вы укажете (например, begin, lookback), чтобы определить, какие батчи нужно запускать.

Если вы хотите обрабатывать данные, начиная с фактического начала вашего датасета, вы обязаны явно указать это с помощью конфигурации begin или флага --event-time-start.

Во время стандартных инкрементальных запусков dbt будет обрабатывать пакеты в соответствии с текущей временной меткой и настроенным lookback, с одним запросом на пакет.

Настройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусковНастройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусков

Отказ от автофильтрации

Если существует вышестоящая модель, в которой настроен event_time, но вы не хотите, чтобы ссылка на неё автоматически фильтровалась, можно явно указать ref('upstream_model').render() и тем самым отказаться от автофильтрации. В целом это не рекомендуется — большинство моделей, в которых настраивается event_time, достаточно большие, и если ссылка на такую модель не будет отфильтрована, каждый батч будет выполнять полный скан входной таблицы.

Заполнения пропусков

Будь то исправление ошибочных исходных данных или ретроспективное применение изменения в бизнес-логике, вам может понадобиться перепроцессировать большое количество исторических данных.

Заполнение пропусков в модели microbatch так же просто, как выбрать ее для запуска или сборки и указать "начало" и "конец" для event_time. Обратите внимание, что --event-time-start и --event-time-end являются взаимно необходимыми, то есть, если вы указываете одно, вы должны указать другое.

Как всегда, dbt будет обрабатывать пакеты между началом и концом как независимые запросы.

dbt run --event-time-start "2024-09-01" --event-time-end "2024-09-04"
Настройте lookback, чтобы повторно обрабатывать дополнительные батчи во время стандартных инкрементальных запусковНастройте lookback, чтобы повторно обрабатывать дополнительные батчи во время стандартных инкрементальных запусков

Если один или несколько ваших пакетов не удается, вы можете использовать dbt retry, чтобы перепроцессировать только неудачные пакеты.

Частичная повторная попытка

Часовые пояса

На данный момент dbt предполагает, что все предоставленные значения находятся в UTC:

  • event_time
  • begin
  • --event-time-start
  • --event-time-end

Хотя мы можем рассмотреть возможность добавления поддержки пользовательских часовых поясов в будущем, мы также считаем, что определение этих значений в UTC упрощает жизнь всем.

Сравнение microbatch с другими инкрементальными стратегиями

По мере того, как хранилища данных внедряют новые операции для одновременной замены/объединения разделов данных, мы можем обнаружить, что новая операция для хранилища данных более эффективна, чем то, что адаптер использует для microbatch. В таких случаях мы оставляем за собой право обновить стандартную операцию для microbatch, при условии, что она работает как задумано/документировано для моделей, соответствующих парадигме microbatch.

Большинство инкрементальных моделей полагаются на конечного пользователя (вас), чтобы явно указать dbt, что означает "новый" в контексте каждой модели, написав фильтр в условном блоке {% if is_incremental() %}. Вы несете ответственность за создание этого SQL таким образом, чтобы он запрашивал {{ this }} для проверки, когда последняя запись была загружена, с необязательным окном обратного просмотра для записей, поступивших с опозданием.

Другие инкрементальные стратегии будут контролировать как данные добавляются в таблицу — будь то только добавление insert, delete + insert, merge, insert overwrite и т.д. — но у всех них есть это общее.

В качестве примера:

{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='date_day'
)
}}

select * from {{ ref('stg_events') }}

{% if is_incremental() %}
-- этот фильтр будет применен только при инкрементальном запуске
-- добавьте окно обратного просмотра в 3 дня, чтобы учесть записи, поступившие с опозданием
where date_day >= (select {{ dbt.dateadd("day", -3, "max(date_day)") }} from {{ this }})
{% endif %}

Для этой инкрементальной модели:

  • "Новые" записи — это те, у которых date_day больше, чем максимальный date_day, который был загружен ранее
  • Окно обратного просмотра составляет 3 дня
  • Когда есть новые записи для данного date_day, существующие данные для date_day удаляются, и новые данные вставляются

Давайте возьмем наш предыдущий пример и вместо этого используем новую инкрементальную стратегию microbatch:

models/staging/stg_events.sql
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_occured_at',
batch_size='day',
lookback=3,
begin='2020-01-01',
full_refresh=false
)
}}

select * from {{ ref('stg_events') }} -- этот ref будет автоматически отфильтрован

Где вы также установили event_time для прямых родителей модели - в данном случае, stg_events:

models/staging/stg_events.yml
models:
- name: stg_events
config:
event_time: my_time_field

И это все!

Когда вы запускаете модель, каждый пакет шаблонирует отдельный запрос. Например, если вы запускали модель 1 октября, dbt шаблонировал бы отдельные запросы для каждого дня между 28 сентября и 1 октября включительно — всего четыре пакета.

Запрос для 2024-10-01 выглядел бы так:

target/compiled/staging/stg_events.sql
select * from (
select * from "analytics"."stg_events"
where my_time_field >= '2024-10-01 00:00:00'
and my_time_field < '2024-10-02 00:00:00'
)

На основе вашей платформы данных dbt выберет наиболее эффективный атомарный механизм для вставки, обновления или замены этих четырех пакетов (2024-09-28, 2024-09-29, 2024-09-30 и 2024-10-01) в существующей таблице.

Нашли ошибку?

0
Loading