Конфигурации Databricks
Конфигурация таблиц
При материализации модели как table
, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-databricks, в дополнение к стандартным конфигурациям моделей.
Стратегия append
Следуя стратегии append
, dbt выполнит оператор insert into
со всеми новыми данными. Привлекательность этой стратегии заключается в том, что она проста и функциональна на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view databricks_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.databricks_incremental
select `date_day`, `users` from databricks_incremental__dbt_tmp
Стратегия insert_overwrite
Эта стратегия в настоящее время совместима только с кластерами общего назначения, а не с SQL-складами.
Эта стратегия наиболее эффективна, когда указана вместе с partition_by
в конфигурации вашей модели. dbt выполнит атомарный оператор insert overwrite
, который динамически заменяет все разделы, включенные в ваш запрос. Убедитесь, что вы повторно выбираете все соответствующие данные для раздела при использовании этой инкрементальной стратегии.
Если partition_by
не указано, то стратегия insert_overwrite
ат омарно заменит все содержимое таблицы, перезаписывая все существующие данные только новыми записями. Однако схема столбцов таблицы остается прежней. Это может быть желательным в некоторых ограниченных случаях, так как минимизирует время простоя при перезаписи содержимого таблицы. Операция сравнима с выполнением truncate
+ insert
на других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table
(которая выполняет create or replace
).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from new_events
group by 1
create temporary view databricks_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.databricks_incremental
partition (date_day)
select `date_day`, `users` from databricks_incremental__dbt_tmp
Стратегия merge
Инкрементальная стратегия merge
требует:
file_format: delta или hudi
- Databricks Runtime 5.1 и выше для формата файла delta
- Apache Spark для формата файла hudi
Адаптер Databricks выполнит атомарный оператор merge
, аналогичный поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key
(рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key
не указан, dbt пропустит критерии совпадения и просто вставит все новые записи (аналогично стратегии append
).
Указание merge
в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда никакая не указана.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta', # или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view merge_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
merge into analytics.merge_incremental as DBT_INTERNAL_DEST
using merge_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
when matched then update set *
when not matched then insert *
Стратегия replace_where
Инкрементальная стратегия replace_where
требует:
file_format: delta
- Databricks Runtime 12.0 и выше
dbt выполнит атомарный оператор replace where
, который избирательно перезаписывает данные, соответствующие одному или нескольким incremental_predicates
, указанным в виде строки или массива. Только строки, соответствующие предикатам, будут вставлены. Если incremental_predicates
не указаны, dbt выполнит атомарную вставку, как в случае append
.
replace_where
вставляет данные в столбцы в порядке их предоставления, а не по имени столбца. Если вы измените порядок столбцов и данные совместимы с существующей схемой, вы можете незаметно вставить значения в неожиданный столбец. Если входные данные несовместимы с существующей схемой, вы получите ошибку.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'replace_where'
incremental_predicates = 'user_id >= 10000' # Никогда не заменяйте пользователей с id < 10000
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view replace_where__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
insert into analytics.replace_where_incremental
replace where user_id >= 10000
table `replace_where__dbt_tmp`
Выбор вычислительных ресурсов для каждой модели
Начиная с версии 1.7.2, вы можете назначать, какой вычислительный ресурс использовать для каждой модели. Для SQL-моделей вы можете выбрать SQL Warehouse (безсерверный или предоставленный) или кластер общего назначения. Для получения подробной информации о том, как эта функция взаимодействует с моделями Python, см. Указание вычислительных ресурсов для моделей Python.
Это необязательная настройка. Если вы не настроите это, как показано ниже, мы будем использовать вычислительные ресурсы, указанные в http_path в верхнем уровне раздела output в вашем профиле. Это также вычислительные ресурсы, которые будут использоваться для задач, не связанных с конкретной моделью, таких как сбор метаданных для всех таблиц в схеме.
Чтобы воспользоваться этой возможностью, вам нужно будет добавить блоки вычислений в ваш профиль:
profile-name:
target: target-name # это целевой объект по умолчанию
outputs:
target-name:
type: databricks
catalog: optional catalog name if you are using Unity Catalog
schema: schema name # Обязательно
host: yourorg.databrickshost.com # Обязательно
### Этот путь используется как вычислительный ресурс по умолчанию
http_path: /sql/your/http/path # Обязательно
### Новый раздел вычислений
compute:
### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса
### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...
target-name: # дополнительные целевые объекты
...
### Для каждого целевого объекта вам нужно определить те же вычислительные ресурсы,
### но вы можете указать разные пути
compute:
### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса
### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...
Новый раздел вычислений представляет собой карту имен, выбранных пользователем, к объектам с свойством http_path. Каждый вычислительный ресурс имеет ключ, который используется в определении/конфигурации модели для указания, какой вычислительный ресурс вы хотите использовать для этой модели/выбора моделей. Мы рекомендуем выбирать имя, которое легко распознается как используемые вами вычислительные ресурсы, например, имя вычислительного ресурса в интерфейсе Databricks.
Вам нужно использовать один и тот же набор имен для вычислительных ресурсов во всех ваших выходных данных, хотя вы можете предоставить разные http_paths, что позволяет использовать разные вычислительные ресурсы в разных сценариях развертывания.
Чтобы настроить это в dbt Cloud, используйте функцию расширенных атрибутов на нужных средах:
compute:
Compute1:
http_path: /SOME/OTHER/PATH
Compute2:
http_path: /SOME/OTHER/PATH
Указание вычислительных ресурсов для моделей
Как и во многих других опциях конфигурации, вы можете указать вычислительные ресурсы для модели несколькими способами, используя databricks_compute
.
В вашем dbt_project.yml
выбранные вычислительные ресурсы могут быть указаны для всех моделей в заданном каталоге:
...
models:
+databricks_compute: "Compute1" # используйте склад/кластер `Compute1` для всех моделей в проекте...
my_project:
clickstream:
+databricks_compute: "Compute2" # ...за исключением моделей в папке `clickstream`, которые будут использовать `Compute2`.
snapshots:
+databricks_compute: "Compute1" # все модели Snapshot настроены на использование `Compute1`.
Для отдельной модели вычислительные ресурсы могут быть указаны в конфигурации модели в вашем файле схемы.
models:
- name: table_model
config:
databricks_compute: Compute1
columns:
- name: id
data_type: int
В качестве альтернативы склад может быть указан в блоке конфигурации SQL-файла модели.
{{
config(
materialized='table',
databricks_compute='Compute1'
)
}}
select * from {{ ref('seed') }}
Чтобы убедиться, что указанные вычислительные ресурсы используются, ищите строки в вашем dbt.log, такие как:
Адаптер Databricks ... использует вычислительный ресурс по умолчанию.
или
Адаптер Databricks ... использует вычислительный ресурс <имя вычислительного ресурса>.
Указание вычислительных ресурсов для моделей Python
Материализация модели Python требует выполнения SQL, а также Python. В частности, если ваша модель Python является инкрементальной, текущий шаблон выполнения включает выполнение Python для создания промежуточной таблицы, которая затем объединяется с вашей целевой таблицей с использованием SQL.
Когда вы указываете databricks_compute
для модели Python, вы в настоящее время указываете только, какой вычислительный ресурс использовать при выполнении SQL, специфичного для модели.
Если вы хотите использовать другой вычислительный ресурс для выполнения самого Python, вы должны указать альтернативный вычислительный ресурс в конфигурации модели.
Например:
def model(dbt, session):
dbt.config(
http_path="sql/protocolv1/..."
)
Если ваш вычислительный ресурс по умолчанию является SQL-складом, вам нужно будет указать http_path кластера общего назначения таким образом.
Сохранение описаний моделей
Поддержка сохранения документов на уровне отношений доступна в dbt v0.17.0. Для получения дополнительной информации о настройке сохранения документов см. документацию.
Когда опция persist_docs
настроена соответствующим образом, вы сможете
увидеть описания моделей в поле Comment
команды describe [table] extended
или show table extended in [database] like '*'
.
Конфигурации формата файлов по умолчанию
Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как
снимки и инкрементальная стратегия merge
, вы захотите
использовать формат файлов Delta или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.
Это довольно удобно сделать, установив верхнеуровневую конфигурацию в вашем файле проекта:
models:
+file_format: delta # или hudi
seeds:
+file_format: delta # или hudi
snapshots:
+file_format: delta # или hudi
Установка свойств таблицы
Свойства таблицы могут быть установлены с вашей конфигурацией для таблиц или представлений с использованием tblproperties
:
{{ config(
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
'delta.autoOptimize.autoCompact' : 'true'
}
) }}
Эти свойства отправляются напрямую в Databricks без проверки в dbt, поэтому будьте внимательны при использовании этой функции. Вам нужно будет выполнить полное обновление инкрементальных материализаций, если вы измените их tblproperties
.
Одним из применений этой функции является обеспечение совместимости таблиц delta
с читателями iceberg
с использованием Универсального формата.
{{ config(
tblproperties={
'delta.enableIcebergCompatV2' = 'true'
'delta.universalFormat.enabledFormats' = 'iceberg'
}
) }}
tblproperties
могут быть указаны для моделей Python, но они будут применены через опер атор ALTER
после создания таблицы.
Это связано с ограничением в PySpark.