Конфигурации Apache Spark
dbt-databricksЕсли вы используете Databricks, рекомендуется использовать адаптер dbt-databricks вместо dbt-spark. Если вы все еще используете dbt-spark с Databricks, рассмотрите возможность миграции с адаптера dbt-spark на адаптер dbt-databricks.
Для версии этой страницы для Databricks обратитесь к разделу Настройка Databricks.
См. Конфигурация Databricks для версии этой страницы для Databricks.
Конфигурация таблиц
При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным конфигурациям модели.
| Loading table... |
Инкрементальные модели
dbt стремится предложить полезные, интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку существует так много вариаций между кластерами Apache Spark в мире, не говоря уже о мощных функциях, предлагаемых пользователям Databricks форматом файлов Delta и пользовательским временем выполнения, понимание всех доступных опций само по себе является задачей.
В качестве альтернативы, вы можете использовать формат файлов Apache Iceberg или Apache Hudi с временем выполнения Apache Spark для построения инкрементальных моделей.
По этой причине плагин dbt-spark сильно полагается на конфигурацию incremental_strategy. Эта конфигурация указывает, как инкрементальная материализация должна строить модели в запусках после первого. Она может быть установлена на одно из трех значений:
append(по умолчанию): Вставка новых записей без обновления или перезаписи существующих данных.insert_overwrite: Если указаноpartition_by, перезаписать разделы в table новыми данными. Еслиpartition_byне указано, перезаписать всю таблицу новыми данными.merge(только для форматов файлов Delta, Iceberg и Hudi): Сопоставление записей на основеunique_key; обновление старых записей, вставка новых. (Еслиunique_keyне указан, все новые данные вставляются, аналогичноappend.)microbatchРеализует стратегию микропакетов с использованиемevent_timeдля определения временных диапазонов для фильтрации данных.
Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указан в dbt_project.yml или в блоке config() файла модели.
Стратегия append
Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в ее простоте и функциональности на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.
Указание append в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда ничего не указано.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view spark_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия insert_overwrite
Эта стратегия наиболее эффективна, когда она указана вместе с параметром partition_by в конфигурации модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все партиции, попавшие в ваш запрос. При использовании этой инкрементальной стратегии обязательно повторно выбирайте (re-select) все релевантные данные для каждой партиции.
Если partition_by не указано, то стратегия insert_overwrite атомарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных обстоятельствах, так как это минимизирует время простоя при перезаписи содержимого таблицы. Операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace) вместо этого.
Примечания по использованию:
- Эта стратегия не поддерживается для таблиц с
file_format: delta. - Эта стратегия недоступна при подключении через SQL-эндпоинты Databricks (
method: odbc+endpoint). - Если вы подключаетесь через кластер Databricks + драйвер ODBC (
method: odbc+cluster), вы должны включитьset spark.sql.sources.partitionOverwriteMode DYNAMICв конфигурации Spark кластера, чтобы динамическая замена разделов работала (incremental_strategy: insert_overwrite+partition_by).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet',
incremental_strategy='insert_overwrite'
) }}
/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from events
group by 1
create temporary view spark_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.spark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия merge
Примечания по использованию: Инкрементальная стратегия merge требует:
file_format: delta, iceberg или hudi- Databricks Runtime 5.1 и выше для формата файлов delta
- Apache Spark для формата файлов Iceberg или Hudi
dbt выполнит атомарный оператор merge, который выглядит почти идентично поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt откажется от критериев совпадения и просто вставит все новые записи (аналогично стратегии append).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta', # или 'iceberg' или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view merge_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
merge into analytics.merge_incremental as DBT_INTERNAL_DEST
using merge_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
when matched then update set *
when not matched then insert *
Сохранение описаний моделей
В dbt поддерживается сохранение документации на уровне relation.
Подробнее о настройке сохранения документации см. документацию.
Когда опция persist_docs настроена должным образом, вы сможете
увидеть описания моделей в поле Comment команды describe [table] extended
или show table extended in [database] like '*'.
Всегда schema, никогда database
Apache Spark использует термины "schema" и "database" взаимозаменяемо. dbt понимает
database как существующий на более высоком уровне, чем schema. Таким образом, вы никогда
не должны использовать или устанавливать database в качестве конфигурации узла или в целевом профиле при запуске dbt-spark.
Если вы хотите управлять схемой/базой данных, в которой dbt будет материализовать модели,
используйте конфигурацию schema и макрос generate_schema_name только.
Конфигурации формата файлов по умолчанию
Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как
снимки и инкрементальная стратегия merge, вы захотите
использовать формат файлов Delta, Iceberg или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.
Это довольно удобно сделать, установив конфигурацию верхнего уровня в вашем файле проекта:
models:
+file_format: delta # или iceberg или hudi
seeds:
+file_format: delta # или iceberg или hudi
snapshots:
+file_format: delta # или iceberg или hudi
Footnotes
-
Если вы настраиваете
location_root, dbt указывает путь размещения в выраженииcreate table. Это изменяет тип таблицы с «managed» на «external» в Spark/Databricks. ↩
