Конфигурации Apache Spark
dbt-databricksЕсли вы используете Databricks, рекомендуется использовать адаптер dbt-databricks вместо dbt-spark. Если вы все еще используете dbt-spark с Databricks, рассмотрите возможность миграции с адаптера dbt-spark на адаптер dbt-databricks.
Для версии этой страницы для Databricks обратитесь к разделу Настройка Databricks.
См. Конфигурация Databricks для версии этой страницы для Databricks.
Конфигурация таблиц
При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным конфигурациям модели.
| Опция | Описание | Обязательна? | Пример |
|---|---|---|---|
| file_format | Формат файла, используемый при создании таблиц (parquet, delta, iceberg, hudi, csv, json, text, jdbc, orc, hive или libsvm). | Необязательная | parquet |
| location_root | Созданная таблица использует указанный каталог для хранения своих данных. К нему добавляется псевдоним таблицы. | Необязательная | /mnt/root |
| partition_by | Разделить созданную таблицу по указанным столбцам. Для каждого раздела создается каталог. | Необязательная | date_day |
| clustered_by | Каждый раздел в созданной таблице будет разделен на фиксированное количество корзин по указанным столбцам. | Необязательная | country_code |
| buckets | Количество корзин, создаваемых при кластеризации | Обязательна, если указано clustered_by | 8 |
Инкрементальные модели
dbt стремится предложить полезные, интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку существует так много вариаций между кластерами Apache Spark в мире, не говоря уже о мощных функциях, предлагаемых пользователям Databricks форматом файлов Delta и пользовательским временем выполнения, понимание всех доступных опций само по себе является задачей.
В качестве альтернативы, вы можете использовать формат файлов Apache Iceberg или Apache Hudi с временем выполнения Apache Spark для построения инкрементальных моделей.
По этой причине плагин dbt-spark сильно полагается на конфигурацию incremental_strategy. Эта конфигурация указывает, как инкрементальная материализация должна строить модели в запусках после первого. Она может быть установлена на одно из трех значений:
append(по умолчанию): Вставка новых записей без обновления или перезаписи существующих данных.insert_overwrite: Если указаноpartition_by, перезаписать разделы в новыми данными. Еслиpartition_byне указано, перезаписать всю таблицу новыми данными.merge(только для форматов файлов Delta, Iceberg и Hudi): Сопоставление записей на основеunique_key; обновление старых записей, вставка новых. (Еслиunique_keyне указан, все новые данные вставляются, аналогичноappend.)microbatchРеализует стратегию микропакетов с использованиемevent_timeдля определения временных диапазонов для фильтрации данных.
Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указан в dbt_project.yml или в блоке config() файла модели.
Стратегия append
Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в ее простоте и функциональности на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.
Указание append в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда ничего не указано.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view spark_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия insert_overwrite
Эта стратегия наиболее эффективна, когда указана вместе с клаузой partition_by в конфигурации вашей модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, включенные в ваш запрос. Убедитесь, что вы повторно выбираете все соответствующие данные для раздела при использовании этой инкрементальной стратегии.
Если partition_by не указано, то стратегия insert_overwrite атомарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных обстоятельствах, так как это минимизирует время простоя при перезаписи содержимого таблицы. Операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace) вместо этого.
Примечания по использованию:
- Эта стратегия не поддерживается для таблиц с
file_format: delta. - Эта стратегия недоступна при подключении через SQL-эндпоинты Databricks (
method: odbc+endpoint). - Если вы подключаетесь через кластер Databricks + драйвер ODBC (
method: odbc+cluster), вы должны включитьset spark.sql.sources.partitionOverwriteMode DYNAMICв конфигурации Spark кластера, чтобы динамическая замена разделов работала (incremental_strategy: insert_overwrite+partition_by).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from events
group by 1
create temporary view spark_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.spark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия merge
Примечания по использованию: Инкрементальная стратегия merge требует:
file_format: delta, iceberg или hudi- Databricks Runtime 5.1 и выше для формата файлов delta
- Apache Spark для формата файлов Iceberg или Hudi
dbt выполнит атомарный оператор merge, который выглядит почти идентично поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt откажется от критериев совпадения и просто вставит все новые записи (аналогично стратегии append).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta', # или 'iceberg' или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view merge_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
merge into analytics.merge_incremental as DBT_INTERNAL_DEST
using merge_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
when matched then update set *
when not matched then insert *
Сохранение описаний моделей
Поддержка сохранения документов на уровне отношений доступна в dbt версии 0.17.0. Для получения дополнительной информации о настройке сохранения документов см. документацию.
Когда опция persist_docs настроена должным образом, вы сможете
увидеть описания моделей в поле Comment команды describe [table] extended
или show table extended in [database] like '*'.
Всегда schema, никогда database
Apache Spark использует термины "schema" и "database" взаимозаменяемо. dbt понимает
database как существующий на более высоком уровне, чем schema. Таким образом, вы никогда
не должны использовать или устанавливать database в качестве конфигурации узла или в целевом профиле при запуске dbt-spark.
Если вы хотите управлять схемой/базой данных, в которой dbt будет материализовать модели,
используйте конфигурацию schema и макрос generate_schema_name только.
Конфигурации формата файлов по умолчанию
Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как
снимки и инкрементальная стратегия merge, вы захотите
использовать формат файлов Delta, Iceberg или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.
Это довольно удобно сделать, установив конфигурацию верхнего уровня в вашем файле проекта:
models:
+file_format: delta # или iceberg или hudi
seeds:
+file_format: delta # или iceberg или hudi
snapshots:
+file_format: delta # или iceberg или hudi
