Перейти к основному содержимому

Конфигурации Apache Spark

Если вы используете Databricks, используйте dbt-databricks

Если вы используете Databricks, рекомендуется использовать адаптер dbt-databricks вместо dbt-spark. Если вы все еще используете dbt-spark с Databricks, рассмотрите возможность миграции с адаптера dbt-spark на адаптер dbt-databricks.

Для версии этой страницы для Databricks обратитесь к разделу Настройка Databricks.

примечание

См. Конфигурация Databricks для версии этой страницы для Databricks.

Конфигурация таблиц

При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-spark, в дополнение к стандартным конфигурациям модели.

OptionDescriptionRequired?
Example
file_formatФормат файла, который будет использоваться при создании таблиц (parquet, delta, iceberg, hudi, csv, json, text, jdbc, orc, hive или libsvm).Optionalparquet
location_root 1Создаваемая таблица использует указанный каталог для хранения своих данных. К этому пути будет добавлен алиас таблицы.Optional/mnt/root
partition_byРазбивает создаваемую таблицу на партиции по указанным столбцам. Для каждой партиции создаётся отдельный каталог.Optionaldate_day
clustered_byКаждая партиция в создаваемой таблице будет дополнительно разбита на фиксированное количество бакетов по указанным столбцам.Optionalcountry_code
bucketsКоличество бакетов, которые будут созданы при кластеризации.Required if clustered_by is specified8
tblpropertiesСвойства таблицы, которые настраивают её поведение. Набор доступных свойств зависит от формата файла, см. справочную документацию (Iceberg, Parquet, Delta, Hudi).Optional# Iceberg Example
tblproperties:
  read.split.target-size: 268435456
  commit.retry.num-retries: 10
Loading table...

Инкрементальные модели

dbt стремится предложить полезные, интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций. Поскольку существует так много вариаций между кластерами Apache Spark в мире, не говоря уже о мощных функциях, предлагаемых пользователям Databricks форматом файлов Delta и пользовательским временем выполнения, понимание всех доступных опций само по себе является задачей.

В качестве альтернативы, вы можете использовать формат файлов Apache Iceberg или Apache Hudi с временем выполнения Apache Spark для построения инкрементальных моделей.

По этой причине плагин dbt-spark сильно полагается на конфигурацию incremental_strategy. Эта конфигурация указывает, как инкрементальная материализация должна строить модели в запусках после первого. Она может быть установлена на одно из трех значений:

  • append (по умолчанию): Вставка новых записей без обновления или перезаписи существующих данных.
  • insert_overwrite: Если указано partition_by, перезаписать разделы в table новыми данными. Если partition_by не указано, перезаписать всю таблицу новыми данными.
  • merge (только для форматов файлов Delta, Iceberg и Hudi): Сопоставление записей на основе unique_key; обновление старых записей, вставка новых. (Если unique_key не указан, все новые данные вставляются, аналогично append.)
  • microbatch Реализует стратегию микропакетов с использованием event_time для определения временных диапазонов для фильтрации данных.

Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указан в dbt_project.yml или в блоке config() файла модели.

Стратегия append

Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в ее простоте и функциональности на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.

Указание append в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда ничего не указано.

spark_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Стратегия insert_overwrite

Эта стратегия наиболее эффективна, когда она указана вместе с параметром partition_by в конфигурации модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все партиции, попавшие в ваш запрос. При использовании этой инкрементальной стратегии обязательно повторно выбирайте (re-select) все релевантные данные для каждой партиции.

Если partition_by не указано, то стратегия insert_overwrite атомарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных обстоятельствах, так как это минимизирует время простоя при перезаписи содержимого таблицы. Операция сопоставима с выполнением truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace) вместо этого.

Примечания по использованию:

  • Эта стратегия не поддерживается для таблиц с file_format: delta.
  • Эта стратегия недоступна при подключении через SQL-эндпоинты Databricks (method: odbc + endpoint).
  • Если вы подключаетесь через кластер Databricks + драйвер ODBC (method: odbc + cluster), вы должны включить set spark.sql.sources.partitionOverwriteMode DYNAMIC в конфигурации Spark кластера, чтобы динамическая замена разделов работала (incremental_strategy: insert_overwrite + partition_by).
Кластер Databricks: Конфигурация SparkКластер Databricks: Конфигурация Spark
spark_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet',
incremental_strategy='insert_overwrite'
) }}

/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from events
group by 1

Стратегия merge

Примечания по использованию: Инкрементальная стратегия merge требует:

  • file_format: delta, iceberg или hudi
  • Databricks Runtime 5.1 и выше для формата файлов delta
  • Apache Spark для формата файлов Iceberg или Hudi

dbt выполнит атомарный оператор merge, который выглядит почти идентично поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt откажется от критериев совпадения и просто вставит все новые записи (аналогично стратегии append).

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta', # или 'iceberg' или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Сохранение описаний моделей

В dbt поддерживается сохранение документации на уровне relation.
Подробнее о настройке сохранения документации см. документацию.

Когда опция persist_docs настроена должным образом, вы сможете увидеть описания моделей в поле Comment команды describe [table] extended или show table extended in [database] like '*'.

Всегда schema, никогда database

Apache Spark использует термины "schema" и "database" взаимозаменяемо. dbt понимает database как существующий на более высоком уровне, чем schema. Таким образом, вы никогда не должны использовать или устанавливать database в качестве конфигурации узла или в целевом профиле при запуске dbt-spark.

Если вы хотите управлять схемой/базой данных, в которой dbt будет материализовать модели, используйте конфигурацию schema и макрос generate_schema_name только.

Конфигурации формата файлов по умолчанию

Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как снимки и инкрементальная стратегия merge, вы захотите использовать формат файлов Delta, Iceberg или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.

Это довольно удобно сделать, установив конфигурацию верхнего уровня в вашем файле проекта:

dbt_project.yml
models:
+file_format: delta # или iceberg или hudi

seeds:
+file_format: delta # или iceberg или hudi

snapshots:
+file_format: delta # или iceberg или hudi

Footnotes

  1. Если вы настраиваете location_root, dbt указывает путь размещения в выражении create table. Это изменяет тип таблицы с «managed» на «external» в Spark/Databricks.

Нашли ошибку?

0
Loading