Перейти к основному содержимому

О микропакетных инкрементальных моделях beta

Microbatch

Новая стратегия microbatch доступна в бета-версии для dbt Cloud "Latest" и dbt Core v1.9.

Если вы используете пользовательский макрос microbatch, установите флаг изменения поведения в вашем dbt_project.yml, чтобы включить пакетное выполнение. Если у вас нет пользовательского макроса microbatch, вам не нужно устанавливать этот флаг, так как dbt автоматически обработает микропакетирование для любой модели, использующей стратегию microbatch.

Читайте и участвуйте в обсуждении: dbt-core#10672

Обратитесь к Поддерживаемые инкрементальные стратегии по адаптеру для списка поддерживаемых адаптеров.

Что такое "microbatch" в dbt?

Инкрементальные модели в dbt — это материализация, предназначенная для эффективного обновления таблиц вашего хранилища данных путем трансформации и загрузки новых или измененных данных с момента последнего запуска. Вместо повторной обработки всего набора данных каждый раз, инкрементальные модели обрабатывают меньшее количество строк, а затем добавляют, обновляют или заменяют эти строки в существующей таблице. Это может значительно сократить время и ресурсы, необходимые для ваших преобразований данных.

Microbatch — это инкрементальная стратегия, разработанная для больших временных рядов данных:

  • Она полагается исключительно на временную колонку (event_time) для определения временных диапазонов для фильтрации. Установите колонку event_time для вашей модели microbatch и ее прямых родителей (входящих моделей). Обратите внимание, что это отличается от partition_by, который группирует строки в разделы.

  • Она дополняет, а не заменяет существующие инкрементальные стратегии, сосредотачиваясь на эффективности и простоте пакетной обработки.

  • В отличие от традиционных инкрементальных стратегий, microbatch позволяет перепроцессировать неудачные пакеты, автоматически обнаруживать параллельное выполнение пакетов и устранять необходимость в сложной условной логике для заполнения пропусков.

  • Обратите внимание, что microbatch может не быть лучшей стратегией для всех случаев использования. Рассмотрите другие стратегии для случаев, таких как отсутствие надежной колонки event_time или если вы хотите больше контроля над инкрементальной логикой. Подробнее читайте в Как microbatch сравнивается с другими инкрементальными стратегиями.

Как работает microbatch

Когда dbt запускает модель microbatch — будь то в первый раз, во время инкрементальных запусков или в указанных заполнениях пропусков — он разделит обработку на несколько запросов (или "пакетов"), основываясь на event_time и batch_size, которые вы настроили.

Каждый "пакет" соответствует одному ограниченному временному периоду (по умолчанию, один день данных). В то время как другие инкрементальные стратегии работают только с "старыми" и "новыми" данными, модели microbatch рассматривают каждый пакет как атомарную единицу, которую можно построить или заменить самостоятельно. Каждый пакет независим и .

Это мощная абстракция, которая позволяет dbt запускать пакеты отдельно, одновременно и повторно их независимо.

Пример

Модель sessions агрегирует и обогащает данные, поступающие из двух других моделей:

  • page_views — это большая таблица временных рядов. Она содержит много строк, новые записи почти всегда поступают после существующих, и существующие записи редко обновляются. Она использует колонку page_view_start в качестве event_time.
  • customers — это относительно небольшая размерная таблица. Атрибуты клиентов часто обновляются и не в временном порядке — то есть, старые клиенты с такой же вероятностью могут изменить значения колонок, как и новые клиенты. Модель customers не настраивает колонку event_time.

В результате:

  • Каждый пакет sessions будет фильтровать page_views до эквивалентного временно ограниченного пакета.
  • Таблица customers не фильтруется, что приводит к полному сканированию для каждого пакета.
подсказка

В дополнение к настройке event_time для целевой таблицы, вы также должны указать его для любых входящих моделей, которые вы хотите фильтровать, даже если у них разные временные колонки.

models/staging/page_views.yml
models:
- name: page_views
config:
event_time: page_view_start

Мы запускаем модель sessions для 1 октября 2024 года, а затем снова для 2 октября. Это приводит к следующим запросам:

event_time для модели sessions установлен на session_start, который отмечает начало сеанса пользователя на сайте. Эта настройка позволяет dbt объединять несколько просмотров страниц (каждый из которых отслеживается своими собственными временными метками page_view_start) в один сеанс. Таким образом, session_start различает время отдельных просмотров страниц от более широкого временного интервала всего сеанса пользователя.

models/sessions.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='session_start',
begin='2020-01-01',
batch_size='day'
) }}

with page_views as (

-- этот ref будет автоматически отфильтрован
select * from {{ ref('page_views') }}

),

customers as (

-- этот ref не будет
select * from {{ ref('customers') }}

),

select
page_views.id as session_id,
page_views.page_view_start as session_start,
customers.*
from page_views
left join customers
on page_views.customer_id = customer.id

dbt даст указание платформе данных взять результат каждого пакетного запроса и вставить, обновить или заменить содержимое таблицы analytics.sessions для того же дня данных. Для выполнения этой операции dbt использует наиболее эффективный атомарный механизм для "полной пакетной" замены, доступный на каждой платформе данных.

Не имеет значения, содержит ли таблица уже данные за этот день. При одинаковых входных данных результирующая таблица будет одинаковой, независимо от того, сколько раз пакет перепроцессируется.

Каждый паке�т сеансов фильтрует page_views до соответствующего временно ограниченного пакета, но не фильтрует сеансы, выполняя полное сканирование для каждого пакета.Каждый пакет сеансов фильтрует page_views до соответствующего временно ограниченного пакета, но не фильтрует сеансы, выполняя полное сканирование для каждого пакета.

Соответствующие конфигурации

Несколько конфигураций имеют отношение к моделям microbatch, и некоторые из них обязательны:

КонфигурацияОписаниеПо умолчаниюТипОбязательно
event_timeКолонка, указывающая "в какое время произошла строка". Обязательно для вашей модели microbatch и любых прямых родителей, которые должны быть отфильтрованы.N/AКолонкаОбязательно
begin"Начало времени" для модели microbatch. Это отправная точка для любых начальных или полных обновлений. Например, модель microbatch с дневной зернистостью, запущенная 1 октября 2024 года с begin = '2023-10-01, обработает 366 пакетов (это високосный год!) плюс пакет для "сегодня".N/AДатаОбязательно
batch_sizeГранулярность ваших пакетов. Поддерживаемые значения: hour, day, month, и yearN/AСтрокаОбязательно
lookbackОбработать X пакетов перед последней закладкой, чтобы захватить записи, поступившие с опозданием.1Целое числоНеобязательно
concurrent_batchesПереопределяет автоматическое обнаружение dbt для выполнения пакетов одновременно (в одно и то же время). Подробнее о настройке параллельных пакетов. Установка на
* true выполняет пакеты одновременно (параллельно).
* false выполняет пакеты последовательно (один за другим).
NoneЛогическоеНеобязательно
Конфигурация колонки event_time настраивает реальное время этой записиКонфигурация колонки event_time настраивает реальное время этой записи

Обязательные конфигурации для конкретных адаптеров

Некоторые адаптеры требуют дополнительных конфигураций для стратегии microbatch. Это связано с тем, что каждый адаптер реализует стратегию microbatch по-разному.

Следующая таблица перечисляет обязательные конфигурации для конкретных адаптеров, в дополнение к стандартным конфигурациям microbatch:

АдаптерКонфигурация unique_keyКонфигурация partition_by
dbt-postgres✅ ОбязательноN/A
dbt-sparkN/A✅ Обязательно
dbt-bigqueryN/A✅ Обязательно

Например, если вы используете dbt-postgres, настройте unique_key следующим образом:

models/sessions.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
unique_key='sales_id', ## обязательно для dbt-postgres
event_time='transaction_date',
begin='2023-01-01',
batch_size='day'
) }}

select
sales_id,
transaction_date,
customer_id,
product_id,
total_amount
from {{ source('sales', 'transactions') }}

В этом примере unique_key обязателен, потому что microbatch в dbt-postgres использует стратегию merge, которая требует unique_key для идентификации строк в хранилище данных, которые необходимо объединить. Без unique_key dbt не сможет сопоставить строки между входящим пакетом и существующей таблицей.

Полное обновление

Как лучшую практику, мы рекомендуем настраивать full_refresh: False на моделях microbatch, чтобы они игнорировали вызовы с флагом --full-refresh. Если вам нужно перепроцессировать исторические данные, сделайте это с помощью целевого заполнения пропусков, указывая явные начальные и конечные даты.

Использование

Вы должны написать запрос вашей модели для обработки (чтения и возврата) точно одного "пакета" данных. Это упрощающее предположение и мощное:

  • Вам не нужно думать о фильтрации is_incremental
  • Вам не нужно выбирать среди стратегий DML (вставка/объединение/замена)
  • Вы можете предварительно просмотреть вашу модель и увидеть точные записи для данного пакета, которые появятся, когда этот пакет будет обработан и записан в таблицу

Когда вы запускаете модель microbatch, dbt оценит, какие пакеты нужно загрузить, разобьет их на SQL-запрос для каждого пакета и загрузит каждый независимо.

dbt автоматически отфильтрует входные данные (source или ref), которые определяют event_time, основываясь на конфигурациях lookback и batch_size для этой модели.

Во время стандартных инкрементальных запусков dbt будет обрабатывать пакеты в соответствии с текущей временной меткой и настроенным lookback, с одним запросом на пакет.

Настройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусковНастройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусков

Примечание: Если есть входящая модель, которая настраивает event_time, но вы не хотите, чтобы ссылка на нее была отфильтрована, вы можете указать ref('upstream_model').render(), чтобы отказаться от автоматической фильтрации. Это не рекомендуется в общем случае — большинство моделей, которые настраивают event_time, довольно большие, и если ссылка не отфильтрована, каждый пакет будет выполнять полное сканирование этой входной таблицы.

Заполнения пропусков

Будь то исправление ошибочных исходных данных или ретроспективное применение изменения в бизнес-логике, вам может понадобиться перепроцессировать большое количество исторических данных.

Заполнение пропусков в модели microbatch так же просто, как выбрать ее для запуска или сборки и указать "начало" и "конец" для event_time. Обратите внимание, что --event-time-start и --event-time-end являются взаимно необходимыми, то есть, если вы указываете одно, вы должны указать другое.

Как всегда, dbt будет обрабатывать пакеты между началом и концом как независимые запросы.

dbt run --event-time-start "2024-09-01" --event-time-end "2024-09-04"
Настройте lookback для перепроцессирования дополнительных пакетов во время стандарт�ных инкрементальных запусковНастройте lookback для перепроцессирования дополнительных пакетов во время стандартных инкрементальных запусков

Повторная попытка

Если один или несколько ваших пакетов не удается, вы можете использовать dbt retry, чтобы перепроцессировать только неудачные пакеты.

Частичная повторная попытка

Часовые пояса

На данный момент dbt предполагает, что все предоставленные значения находятся в UTC:

  • event_time
  • begin
  • --event-time-start
  • --event-time-end

Хотя мы можем рассмотреть возможность добавления поддержки пользовательских часовых поясов в будущем, мы также считаем, что определение этих значений в UTC упрощает жизнь всем.

Параллельное выполнение пакетов

Стратегия microbatch предлагает преимущество обновления модели в меньших, более управляемых пакетах. В зависимости от вашего случая использования, настройка ваших моделей microbatch для выполнения в параллельном режиме предлагает более быстрое выполнение по сравнению с последовательным выполнением пакетов.

Параллельное выполнение пакетов означает, что несколько пакетов обрабатываются одновременно, а не один за другим (последовательно) для более быстрого выполнения ваших моделей microbatch.

dbt автоматически определяет, может ли пакет выполняться параллельно в большинстве случаев, что означает, что вам не нужно настраивать эту настройку. Однако конфигурация concurrent_batches доступна как переопределение (не как ограничение), позволяя вам указать, должны ли пакеты выполняться параллельно или нет в конкретных случаях.

Например, если у вас есть модель microbatch с 12 пакетами, вы можете выполнить эти пакеты параллельно. Конкретно они будут выполняться параллельно, ограниченные количеством доступных потоков.

Предварительные условия

Чтобы включить параллельное выполнение, вы должны:

  • Использовать поддерживаемый адаптер:
    • Snowflake
    • Databricks
    • Скоро появятся другие адаптеры!
      • Мы продолжим тестировать и добавлять поддержку параллельности для адаптеров. Это означает, что некоторые адаптеры могут получить поддержку параллельности после первоначального выпуска 1.9.
  • Соответствовать дополнительным условиям, описанным в следующем разделе.

Как работает параллельное выполнение пакетов

Пакет может выполняться параллельно, только если все эти условия выполнены:

УсловиеПараллельное выполнениеПоследовательное выполнение
Не первый пакет-
Не последний пакет-
Адаптер поддерживает параллельные пакеты-

После проверки условий в предыдущей таблице — и если значение concurrent_batches не установлено, dbt будет интеллектуально автоматически определять, вызывает ли модель функцию Jinja {{ this }}. Если она ссылается на {{ this }}, пакеты будут выполняться последовательно, так как {{ this }} представляет базу данных текущей модели, и ссылка на одно и то же отношение вызывает конфликт.

В противном случае, если {{ this }} не обнаружен (и другие условия выполнены), пакеты будут выполняться параллельно, что можно переопределить, когда вы устанавливаете значение для concurrent_batches.

Параллельное или последовательное выполнение

Выбор между параллельным выполнением пакетов и последовательной обработкой зависит от конкретных требований вашего случая использования.

  • Параллельное выполнение пакетов быстрее, но требует логики, независимой от порядка выполнения пакетов. Например, если вы разрабатываете конвейер данных для системы, которая обрабатывает пользовательские транзакции в пакетах, каждый пакет выполняется параллельно для лучшей производительности. Однако логика, используемая для обработки каждой транзакции, не должна зависеть от порядка выполнения или завершения пакетов.
  • Последовательная обработка медленнее, но необходима для расчетов, таких как накопительные метрики в моделях microbatch. Она обрабатывает данные в правильном порядке, позволяя каждому шагу основываться на предыдущем.

Настройка concurrent_batches

По умолчанию dbt автоматически определяет, могут ли пакеты выполняться параллельно для моделей microbatch, и это работает правильно в большинстве случаев. Однако вы можете переопределить определение dbt, установив конфигурацию concurrent_batches в вашем dbt_project.yml или файле модели .sql, чтобы указать параллельное или последовательное выполнение, если вы соответствуете всем условиям:

dbt_project.yml
models:
+concurrent_batches: true # значение установлено на true для выполнения пакетов параллельно

Как microbatch сравнивается с другими инкрементальными стратегиями

По мере того, как хранилища данных внедряют новые операции для одновременной замены/объединения разделов данных, мы можем обнаружить, что новая операция для хранилища данных более эффективна, чем то, что адаптер использует для microbatch. В таких случаях мы оставляем за собой право обновить стандартную операцию для microbatch, при условии, что она работает как задумано/документировано для моделей, соответствующих парадигме microbatch.

Большинство инкрементальных моделей полагаются на конечного пользователя (вас), чтобы явно указать dbt, что означает "новый" в контексте каждой модели, написав фильтр в условном блоке {% if is_incremental() %}. Вы несете ответственность за создание этого SQL таким образом, чтобы он запрашивал {{ this }} для проверки, когда последняя запись была загружена, с необязательным окном обратного просмотра для записей, поступивших с опозданием.

Другие инкрементальные стратегии будут контролировать как данные добавляются в таблицу — будь то только добавление insert, delete + insert, merge, insert overwrite и т.д. — но у всех них есть это общее.

В качестве примера:

{{
config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='date_day'
)
}}

select * from {{ ref('stg_events') }}

{% if is_incremental() %}
-- этот фильтр будет применен только при инкрементальном запуске
-- добавьте окно обратного просмотра в 3 дня, чтобы учесть записи, поступившие с опозданием
where date_day >= (select {{ dbt.dateadd("day", -3, "max(date_day)") }} from {{ this }})
{% endif %}

Для этой инкрементальной модели:

  • "Новые" записи — это те, у которых date_day больше, чем максимальный date_day, который был загружен ранее
  • Окно обратного просмотра составляет 3 дня
  • Когда есть новые записи для данного date_day, существующие данные для date_day удаляются, и новые данные вставляются

Давайте возьмем наш предыдущий пример и вместо этого используем новую инкрементальную стратегию microbatch:

models/staging/stg_events.sql
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_occured_at',
batch_size='day',
lookback=3,
begin='2020-01-01',
full_refresh=false
)
}}

select * from {{ ref('stg_events') }} -- этот ref будет автоматически отфильтрован

Где вы также установили event_time для прямых родителей модели - в данном случае, stg_events:

models/staging/stg_events.yml
models:
- name: stg_events
config:
event_time: my_time_field

И это все!

Когда вы запускаете модель, каждый пакет шаблонирует отдельный запрос. Например, если вы запускали модель 1 октября, dbt шаблонировал бы отдельные запросы для каждого дня между 28 сентября и 1 октября включительно — всего четыре пакета.

Запрос для 2024-10-01 выглядел бы так:

target/compiled/staging/stg_events.sql
select * from (
select * from "analytics"."stg_events"
where my_time_field >= '2024-10-01 00:00:00'
and my_time_field < '2024-10-02 00:00:00'
)

На основе вашей платформы данных dbt выберет наиболее эффективный атомарный механизм для вставки, обновления или замены этих четырех пакетов (2024-09-28, 2024-09-29, 2024-09-30 и 2024-10-01) в существующей таблице.

0