Перейти к основному содержимому

Конфигурации Databricks

Конфигурация таблиц

При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-databricks, в дополнение к стандартным конфигурациям моделей.

Стратегия append

Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в том, что она проста и функциональна на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.

databricks_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Стратегия insert_overwrite

предупреждение

Эта стратегия в настоящее время совместима только с кластерами общего назначения, а не с SQL-складами.

Эта стратегия наиболее эффективна, когда указана вместе с partition_by в конфигурации вашей модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, включенные в ваш запрос. Убедитесь, что вы повторно выбираете все соответствующие данные для раздела при использовании этой инкрементальной стратегии.

Если partition_by не указано, то стратегия insert_overwrite атомарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных случаях, так как минимизирует время простоя при перезаписи содержимого таблицы. Операция сравнима с выполнением truncate + insert на других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace).

databricks_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from new_events
group by 1

Стратегия merge

Инкрементальная стратегия merge требует:

  • file_format: delta или hudi
  • Databricks Runtime 5.1 и выше для формата файла delta
  • Apache Spark для формата файла hudi

Адаптер Databricks выполнит атомарный оператор merge, аналогичный поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt пропустит критерии совпадения и просто вставит все новые записи (аналогично стратегии append).

Указание merge в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда никакая не указана.

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta', # или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Стратегия replace_where

Инкрементальная стратегия replace_where требует:

  • file_format: delta
  • Databricks Runtime 12.0 и выше

dbt выполнит атомарный оператор replace where, который избирательно перезаписывает данные, соответствующие одному или нескольким incremental_predicates, указанным в виде строки или массива. Только строки, соответствующие предикатам, будут вставлены. Если incremental_predicates не указаны, dbt выполнит атомарную вставку, как в случае append.

предупреждение

replace_where вставляет данные в столбцы в порядке их предоставления, а не по имени столбца. Если вы измените порядок столбцов и данные совместимы с существующей схемой, вы можете незаметно вставить значения в неожиданный столбец. Если входные данные несовместимы с существующей схемой, вы получите ошибку.

replace_where_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'replace_where'
incremental_predicates = 'user_id >= 10000' # Никогда не заменяйте пользователей с id < 10000
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Выбор вычислительных ресурсов для каждой модели

Начиная с версии 1.7.2, вы можете назначать, какой вычислительный ресурс использовать для каждой модели. Для SQL-моделей вы можете выбрать SQL Warehouse (безсерверный или предоставленный) или кластер общего назначения. Для получения подробной информации о том, как эта функция взаимодействует с моделями Python, см. Указание вычислительных ресурсов для моделей Python.

примечание

Это необязательная настройка. Если вы не настроите это, как показано ниже, мы будем использовать вычислительные ресурсы, указанные в http_path в верхнем уровне раздела output в вашем профиле. Это также вычислительные ресурсы, которые будут использоваться для задач, не связанных с конкретной моделью, таких как сбор метаданных для всех таблиц в схеме.

Чтобы воспользоваться этой возможностью, вам нужно будет добавить блоки вычислений в ваш профиль:

profile.yml

profile-name:
target: target-name # это целевой объект по умолчанию
outputs:
target-name:
type: databricks
catalog: optional catalog name if you are using Unity Catalog
schema: schema name # Обязательно
host: yourorg.databrickshost.com # Обязательно

### Этот путь используется как вычислительный ресурс по умолчанию
http_path: /sql/your/http/path # Обязательно

### Новый раздел вычислений
compute:

### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса

### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...

target-name: # дополнительные целевые объекты
...
### Для каждого целевого объекта вам нужно определить те же вычислительные ресурсы,
### но вы можете указать разные пути
compute:

### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса

### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...

Новый раздел вычислений представляет собой карту имен, выбранных пользователем, к объектам с свойством http_path. Каждый вычислительный ресурс имеет ключ, который используется в определении/конфигурации модели для указания, какой вычислительный ресурс вы хотите использовать для этой модели/выбора моделей. Мы рекомендуем выбирать имя, которое легко распознается как используемые вами вычислительные ресурсы, например, имя вычислительного ресурса в интерфейсе Databricks.

примечание

Вам нужно использовать один и тот же набор имен для вычислительных ресурсов во всех ваших выходных данных, хотя вы можете предоставить разные http_paths, что позволяет использовать разные вычислительные ресурсы в разных сценариях развертывания.

Чтобы настроить это в dbt Cloud, используйте функцию расширенных атрибутов на нужных средах:


compute:
Compute1:
http_path: /SOME/OTHER/PATH
Compute2:
http_path: /SOME/OTHER/PATH

Указание вычислительных ресурсов для моделей

Как и во многих других опциях конфигурации, вы можете указать вычислительные ресурсы для модели несколькими способами, используя databricks_compute. В вашем dbt_project.yml выбранные вычислительные ресурсы могут быть указаны для всех моделей в заданном каталоге:

dbt_project.yml

...

models:
+databricks_compute: "Compute1" # используйте склад/кластер `Compute1` для всех моделей в проекте...
my_project:
clickstream:
+databricks_compute: "Compute2" # ...за исключением моделей в папке `clickstream`, которые будут использовать `Compute2`.

snapshots:
+databricks_compute: "Compute1" # все модели Snapshot настроены на использование `Compute1`.

Для отдельной модели вычислительные ресурсы могут быть указаны в конфигурации модели в вашем файле схемы.

schema.yml

models:
- name: table_model
config:
databricks_compute: Compute1
columns:
- name: id
data_type: int

В качестве альтернативы склад может быть указан в блоке конфигурации SQL-файла модели.

model.sql

{{
config(
materialized='table',
databricks_compute='Compute1'
)
}}
select * from {{ ref('seed') }}

Чтобы убедиться, что указанные вычислительные ресурсы используются, ищите строки в вашем dbt.log, такие как:

Адаптер Databricks ... использует вычислительный ресурс по умолчанию.

или

Адаптер Databricks ... использует вычислительный ресурс <имя вычислительного ресурса>.

Указание вычислительных ресурсов для моделей Python

Материализация модели Python требует выполнения SQL, а также Python. В частности, если ваша модель Python является инкрементальной, текущий шаблон выполнения включает выполнение Python для создания промежуточной таблицы, которая затем объединяется с вашей целевой таблицей с использованием SQL.

Когда вы указываете databricks_compute для модели Python, вы в настоящее время указываете только, какой вычислительный ресурс использовать при выполнении SQL, специфичного для модели. Если вы хотите использовать другой вычислительный ресурс для выполнения самого Python, вы должны указать альтернативный вычислительный ресурс в конфигурации модели. Например:

model.py

def model(dbt, session):
dbt.config(
http_path="sql/protocolv1/..."
)

Если ваш вычислительный ресурс по умолчанию является SQL-складом, вам нужно будет указать http_path кластера общего назначения таким образом.

Сохранение описаний моделей

Поддержка сохранения документов на уровне отношений доступна в dbt v0.17.0. Для получения дополнительной информации о настройке сохранения документов см. документацию.

Когда опция persist_docs настроена соответствующим образом, вы сможете увидеть описания моделей в поле Comment команды describe [table] extended или show table extended in [database] like '*'.

Конфигурации формата файлов по умолчанию

Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как снимки и инкрементальная стратегия merge, вы захотите использовать формат файлов Delta или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.

Это довольно удобно сделать, установив верхнеуровневую конфигурацию в вашем файле проекта:

dbt_project.yml
models:
+file_format: delta # или hudi

seeds:
+file_format: delta # или hudi

snapshots:
+file_format: delta # или hudi

Установка свойств таблицы

Свойства таблицы могут быть установлены с вашей конфигурацией для таблиц или представлений с использованием tblproperties:

with_table_properties.sql
{{ config(
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
'delta.autoOptimize.autoCompact' : 'true'
}
) }}
предупреждение

Эти свойства отправляются напрямую в Databricks без проверки в dbt, поэтому будьте внимательны при использовании этой функции. Вам нужно будет выполнить полное обновление инкрементальных материализаций, если вы измените их tblproperties.

Одним из применений этой функции является обеспечение совместимости таблиц delta с читателями iceberg с использованием Универсального формата.

{{ config(
tblproperties={
'delta.enableIcebergCompatV2' = 'true'
'delta.universalFormat.enabledFormats' = 'iceberg'
}
) }}

tblproperties могут быть указаны для моделей Python, но они будут применены через оператор ALTER после создания таблицы. Это связано с ограничением в PySpark.

0