Перейти к основному содержимому

Конфигурации Apache Spark

Если вы используете Databricks, используйте dbt-databricks

Если вы используете Databricks, рекомендуется использовать адаптер dbt-databricks вместо dbt-spark. Если вы все еще используете dbt-spark с Databricks, рассмотрите возможность миграции с адаптера dbt-spark на адаптер dbt-databricks.

Для версии этой страницы для Databricks обратитесь к разделу Настройка Databricks.

примечание

См. Конфигурация Databricks для версии этой страницы для Databricks.

Конфигурация таблиц

При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным конфигурациям модели.

ОпцияОписаниеОбязательна?Пример
file_formatФормат файла, используемый при создании таблиц (parquet, delta, iceberg, hudi, csv, json, text, jdbc, orc, hive или libsvm).Необязательнаяparquet
location_rootСозданная таблица использует указанный каталог для хранения своих данных. К нему добавляется псевдоним таблицы.Необязательная/mnt/root
partition_byРазделить созданную таблицу по указанным столбцам. Для каждого раздела создается каталог.Необязательнаяdate_day
clustered_byКаждый раздел в созданной таблице будет разделен на фиксированное количество корзин по указанным столбцам.Необязательнаяcountry_code
bucketsКоличество корзин, создаваемых при кластеризацииОбязательна, если указано clustered_by8

Инкрементальные модели

dbt стремится предложить полезные, интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку существует так много вариаций между кластерами Apache Spark в мире, не говоря уже о мощных функциях, предлагаемых пользователям Databricks форматом файлов Delta и пользовательским временем выполнения, понимание всех доступных опций само по себе является задачей.

В качестве альтернативы, вы можете использовать формат файлов Apache Iceberg или Apache Hudi с временем выполнения Apache Spark для построения инкрементальных моделей.

По этой причине плагин dbt-spark сильно полагается на конфигурацию incremental_strategy. Эта конфигурация указывает, как инкрементальная материализация должна строить модели в запусках после первого. Она может быть установлена на одно из трех значений:

  • append (по умолчанию): Вставка новых записей без обновления или перезаписи существующих данных.
  • insert_overwrite: Если указано partition_by, перезаписать разделы в новыми данными. Если partition_by не указано, перезаписать всю таблицу новыми данными.
  • merge (только для форматов файлов Delta, Iceberg и Hudi): Сопоставление записей на основе unique_key; обновление старых записей, вставка новых. (Если unique_key не указан, все новые данные вставляются, аналогично append.)
  • microbatch Реализует стратегию микропакетов с использованием event_time для определения временных диапазонов для фильтрации данных.

Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указан в dbt_project.yml или в блоке config() файла модели.

Стратегия append

Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в ее простоте и функциональности на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.

Указание append в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда ничего не указано.

spark_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Стратегия insert_overwrite

Эта стратегия наиболее эффективна, когда указана вместе с клаузой partition_by в конфигурации вашей модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, включенные в ваш запрос. Убедитесь, что вы повторно выбираете все соответствующие данные для раздела при использовании этой инкрементальной стратегии.

Если partition_by не указано, то стратегия insert_overwrite атомарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных обстоятельствах, так как это минимизирует время простоя при перезаписи содержимого таблицы. Операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace) вместо этого.

Примечания по использованию:

  • Эта стратегия не поддерживается для таблиц с file_format: delta.
  • Эта стратегия недоступна при подключении через SQL-эндпоинты Databricks (method: odbc + endpoint).
  • Если вы подключаетесь через кластер Databricks + драйвер ODBC (method: odbc + cluster), вы должны включить set spark.sql.sources.partitionOverwriteMode DYNAMIC в конфигурации Spark кластера, чтобы динамическая замена разделов работала (incremental_strategy: insert_overwrite + partition_by).
Кластер Databricks: Конфигурация SparkКластер Databricks: Конфигурация Spark
spark_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from events
group by 1

Стратегия merge

Примечания по использованию: Инкрементальная стратегия merge требует:

  • file_format: delta, iceberg или hudi
  • Databricks Runtime 5.1 и выше для формата файлов delta
  • Apache Spark для формата файлов Iceberg или Hudi

dbt выполнит атомарный оператор merge, который выглядит почти идентично поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt откажется от критериев совпадения и просто вставит все новые записи (аналогично стратегии append).

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta', # или 'iceberg' или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Сохранение описаний моделей

Поддержка сохранения документов на уровне отношений доступна в dbt версии 0.17.0. Для получения дополнительной информации о настройке сохранения документов см. документацию.

Когда опция persist_docs настроена должным образом, вы сможете увидеть описания моделей в поле Comment команды describe [table] extended или show table extended in [database] like '*'.

Всегда schema, никогда database

Apache Spark использует термины "schema" и "database" взаимозаменяемо. dbt понимает database как существующий на более высоком уровне, чем schema. Таким образом, вы никогда не должны использовать или устанавливать database в качестве конфигурации узла или в целевом профиле при запуске dbt-spark.

Если вы хотите управлять схемой/базой данных, в которой dbt будет материализовать модели, используйте конфигурацию schema и макрос generate_schema_name только.

Конфигурации формата файлов по умолчанию

Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как снимки и инкрементальная стратегия merge, вы захотите использовать формат файлов Delta, Iceberg или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.

Это довольно удобно сделать, установив конфигурацию верхнего уровня в вашем файле проекта:

dbt_project.yml
models:
+file_format: delta # или iceberg или hudi

seeds:
+file_format: delta # или iceberg или hudi

snapshots:
+file_format: delta # или iceberg или hudi
0