Конфигурации Microsoft Fabric Spark
Конфигурация таблиц
При материализации модели как table вы можете указать несколько необязательных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным настройкам моделей.
| Loading table... |
Инкрементальные модели
dbt стремится предоставлять удобные и интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку в мире существует большое разнообразие Spark‑кластеров — не говоря уже о мощных возможностях, доступных пользователям open source благодаря формату Delta и кастомным runtime, — разобраться во всех доступных опциях само по себе является нетривиальной задачей.
По этой причине плагин dbt-fabricspark в значительной степени опирается на конфигурацию incremental_strategy. Эта настройка указывает инкрементальной материализации, как собирать модели при запусках после первого. Она может принимать одно из следующих значений:
append(по умолчанию): вставлять новые записи без обновления или перезаписи существующих данных.insert_overwrite: если указаноpartition_by, перезаписывать разделы в table новыми данными. Еслиpartition_byне указано, перезаписывать всю таблицу новыми данными.merge(только формат Delta): сопоставлять записи поunique_key, обновлять старые записи и вставлять новые. (Еслиunique_keyне указан, все новые данные будут вставлены, аналогично стратегииappend.)microbatch: реализует стратегию microbatch, используяevent_timeдля определения временных диапазонов фильтрации данных.
У каждой из этих стратегий есть свои плюсы и минусы, которые мы рассмотрим ниже. Как и любую другую конфигурацию модели, incremental_strategy можно указывать в dbt_project.yml или внутри блока config() в файле модели.
Стратегия append
При использовании стратегии append dbt выполняет оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в её простоте и работоспособности на всех платформах, типах файлов, способах подключения и версиях Fabric Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому для многих источников данных она, скорее всего, будет приводить к вставке дубликатов.
Указывать append в качестве инкрементальной стратегии необязательно, так как это стратегия по умолчанию, если явно ничего не задано.
- Source code
- Run code
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- All rows returned by this query will be appended to the existing table
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view fabricspark_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.fabricspark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия insert_overwrite
Эта стратегия наиболее эффективна при использовании совместно с параметром partition_by в конфигурации модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, затронутые запросом. При использовании этой инкрементальной стратегии обязательно повторно выбирайте все релевантные данные для каждого раздела.
Если partition_by не указан, стратегия insert_overwrite атомарно заменит всё содержимое таблицы, перезаписав все существующие данные только новыми записями. При этом схема колонок таблицы остаётся неизменной. В некоторых ограниченных случаях это может быть желаемым поведением, так как минимизирует простой во время перезаписи содержимого таблицы. Такая операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta вместо этого используйте материализацию table (которая выполняет create or replace).
Примечания по использованию:
- Эта стратегия не поддерживается для таблиц с
file_format: delta.
- Source code
- Run code
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Every partition returned by this query will be overwritten
when this model runs
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from events
group by 1
create temporary view fabricspark_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.fabricspark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия merge
Примечания по использованию: инкрементальная стратегия merge требует:
file_format: delta- Fabric Spark Runtime версии 3.0 и выше для формата Delta
dbt выполнит атомарный оператор merge, который по своему виду и поведению практически идентичен стандартному merge в Fabric Warehouse, SQL‑базах данных, Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt будет обновлять старые записи значениями из новых записей, совпадающих по ключу. Если unique_key не указан, dbt не будет использовать условия сопоставления и просто вставит все новые записи (аналогично стратегии append).
- Source code
- Run code
{{ config(
materialized='incremental',
file_format='delta',
unique_key='user_id',
incremental_strategy='merge'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view merge_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
merge into analytics.merge_incremental as DBT_INTERNAL_DEST
using merge_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
when matched then update set *
when not matched then insert *
Сохранение описаний моделей
dbt поддерживает сохранение документации на уровне отношений. Подробнее о настройке сохранения документации см. документацию.
При корректной настройке параметра persist_docs вы сможете
видеть описания моделей в поле Comment результатов команд
describe [table] extended или show table extended in [database] like '*'.
Всегда schema, никогда database
Fabric Spark использует термины «schema» и «database» как взаимозаменяемые. dbt же понимает
database как уровень, находящийся выше schema. Поэтому при использовании dbt-fabricspark
вам никогда не следует использовать или задавать database ни в конфигурации узлов, ни в целевом профиле.
Привыкайте: адаптер не поддерживает схемы внутри Lakehouse.
Конфигурации формата файлов по умолчанию
Чтобы получить доступ к расширенным возможностям инкрементальных стратегий, таким как
snapshots и инкрементальная стратегия merge, рекомендуется
использовать формат Delta в качестве формата файлов по умолчанию при материализации моделей в таблицы.
Это удобно сделать, задав конфигурацию верхнего уровня в файле проекта:
models:
+file_format: delta
seeds:
+file_format: delta
snapshots:
+file_format: delta
Footnotes
-
Если вы настраиваете
location_root, dbt указывает путь расположения в оператореcreate table. Это переводит таблицу из состояния «managed» в «external» в Fabric Lakehouse. ↩