Перейти к основному содержимому

Конфигурации Microsoft Fabric Spark

Конфигурация таблиц

При материализации модели как table вы можете указать несколько необязательных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным настройкам моделей.

ПараметрОписаниеОбязательно?
Пример
file_formatФормат файла, который будет использоваться при создании таблиц (parquet, delta, csv).Optionaldelta
location_root 1Указанный каталог, используемый для хранения данных таблицы. К нему добавляется алиас таблицы.OptionalFiles/<folder> or Tables/<tableName>
partition_byРазбиение таблицы по указанным колонкам. Для каждого раздела создаётся отдельный каталог.Optionaldate_day
clustered_byКаждый раздел таблицы будет разбит на фиксированное количество бакетов по указанным колонкам.Optionalcountry_code
bucketsКоличество бакетов, создаваемых при кластеризации.Required if clustered_by is specified8
tblpropertiesСвойства таблицы, которые настраивают её поведение. Набор свойств зависит от формата файла, см. справочную документацию (Parquet, Delta).OptionalProvider=delta Location=abfss://.../Files/tables/sales_data TableProperty.created.by=data_engineering_team TableProperty.purpose=sales analytics CreatedBy=Delta Lake CreatedAt=2024-12-01 14:21:00 Format=Parquet PartitionColumns=region MinReaderVersion=1 MinWriterVersion=2
Loading table...

Инкрементальные модели

dbt стремится предоставлять удобные и интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку в мире существует большое разнообразие Spark‑кластеров — не говоря уже о мощных возможностях, доступных пользователям open source благодаря формату Delta и кастомным runtime, — разобраться во всех доступных опциях само по себе является нетривиальной задачей.

По этой причине плагин dbt-fabricspark в значительной степени опирается на конфигурацию incremental_strategy. Эта настройка указывает инкрементальной материализации, как собирать модели при запусках после первого. Она может принимать одно из следующих значений:

  • append (по умолчанию): вставлять новые записи без обновления или перезаписи существующих данных.
  • insert_overwrite: если указано partition_by, перезаписывать разделы в table новыми данными. Если partition_by не указано, перезаписывать всю таблицу новыми данными.
  • merge (только формат Delta): сопоставлять записи по unique_key, обновлять старые записи и вставлять новые. (Если unique_key не указан, все новые данные будут вставлены, аналогично стратегии append.)
  • microbatch: реализует стратегию microbatch, используя event_time для определения временных диапазонов фильтрации данных.

У каждой из этих стратегий есть свои плюсы и минусы, которые мы рассмотрим ниже. Как и любую другую конфигурацию модели, incremental_strategy можно указывать в dbt_project.yml или внутри блока config() в файле модели.

Стратегия append

При использовании стратегии append dbt выполняет оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в её простоте и работоспособности на всех платформах, типах файлов, способах подключения и версиях Fabric Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому для многих источников данных она, скорее всего, будет приводить к вставке дубликатов.

Указывать append в качестве инкрементальной стратегии необязательно, так как это стратегия по умолчанию, если явно ничего не задано.

fabricspark_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Стратегия insert_overwrite

Эта стратегия наиболее эффективна при использовании совместно с параметром partition_by в конфигурации модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, затронутые запросом. При использовании этой инкрементальной стратегии обязательно повторно выбирайте все релевантные данные для каждого раздела.

Если partition_by не указан, стратегия insert_overwrite атомарно заменит всё содержимое таблицы, перезаписав все существующие данные только новыми записями. При этом схема колонок таблицы остаётся неизменной. В некоторых ограниченных случаях это может быть желаемым поведением, так как минимизирует простой во время перезаписи содержимого таблицы. Такая операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta вместо этого используйте материализацию table (которая выполняет create or replace).

Примечания по использованию:

  • Эта стратегия не поддерживается для таблиц с file_format: delta.
fabricspark_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from events
group by 1

Стратегия merge

Примечания по использованию: инкрементальная стратегия merge требует:

  • file_format: delta
  • Fabric Spark Runtime версии 3.0 и выше для формата Delta

dbt выполнит атомарный оператор merge, который по своему виду и поведению практически идентичен стандартному merge в Fabric Warehouse, SQL‑базах данных, Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt будет обновлять старые записи значениями из новых записей, совпадающих по ключу. Если unique_key не указан, dbt не будет использовать условия сопоставления и просто вставит все новые записи (аналогично стратегии append).

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Сохранение описаний моделей

dbt поддерживает сохранение документации на уровне отношений. Подробнее о настройке сохранения документации см. документацию.

При корректной настройке параметра persist_docs вы сможете видеть описания моделей в поле Comment результатов команд describe [table] extended или show table extended in [database] like '*'.

Всегда schema, никогда database

Fabric Spark использует термины «schema» и «database» как взаимозаменяемые. dbt же понимает database как уровень, находящийся выше schema. Поэтому при использовании dbt-fabricspark вам никогда не следует использовать или задавать database ни в конфигурации узлов, ни в целевом профиле. Привыкайте: адаптер не поддерживает схемы внутри Lakehouse.

Конфигурации формата файлов по умолчанию

Чтобы получить доступ к расширенным возможностям инкрементальных стратегий, таким как snapshots и инкрементальная стратегия merge, рекомендуется использовать формат Delta в качестве формата файлов по умолчанию при материализации моделей в таблицы.

Это удобно сделать, задав конфигурацию верхнего уровня в файле проекта:

dbt_project.yml
models:
+file_format: delta

seeds:
+file_format: delta

snapshots:
+file_format: delta

Footnotes

  1. Если вы настраиваете location_root, dbt указывает путь расположения в операторе create table. Это переводит таблицу из состояния «managed» в «external» в Fabric Lakehouse.

Нашли ошибку?

0
Loading