Конфигурации Databricks
Конфигурация таблиц
При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-databricks, в дополнение к стандартным конфигурациям моделей.
dbt-databricks v1.9 добавляет поддержку конфигурации table_format: iceberg. Попробуйте уже сейчас в ветке релизов dbt "Latest". Все остальные конфигурации таблиц также поддерживались в версии 1.8.
| Loading table... |
* В настоящее время в PySpark нет API для задания tblproperties при создании таблицы, поэтому эта возможность в первую очередь предназначена для того, чтобы пользователи могли аннотировать таблицы, созданные из Python, с помощью tblproperties.
† When table_format is iceberg, file_format must be delta.
‡ databricks_tags are applied via ALTER statements. Tags cannot be removed via dbt-databricks once applied. To remove tags, use Databricks directly or a post-hook.
^ Когда включён liquid_clustered_by, dbt-databricks выполняет операцию OPTIMIZE (Liquid Clustering) после каждого запуска. Чтобы отключить это поведение, установите переменную DATABRICKS_SKIP_OPTIMIZE=true, которую можно передать в команду запуска dbt (dbt run --vars "{'databricks_skip_optimize': true}") или задать как переменную окружения. См. issue #802.
+ Do not use liquid_clustered_by and auto_liquid_cluster on the same model.
В dbt-databricks v1.10 появилось несколько новых вариантов конфигурации моделей, доступных при включённом флаге use_materialization_v2.
Подробнее см. документацию по флагам поведения Databricks.
Методы отправки Python
Доступно в версиях 1.9 и выше
В dbt-databricks v1.9 (попробуйте уже сейчас в треке релизов dbt «Latest») вы можете использовать следующие четыре варианта для submission_method:
all_purpose_cluster: Выполняет модель Python либо напрямую с использованием command api, либо загружая блокнот и создавая одноразовый запуск заданияjob_cluster: Создает новый кластер заданий для выполнения загруженного блокнота как одноразового запуска заданияserverless_cluster: Использует безсерверный кластер для выполнения загруженного блокнота как одноразового запуска заданияworkflow_job: Создает/обновляет повторно используемый рабочий процесс и загруженный блокнот для выполнения на кластерах общего назначения, задания или безсерверных кластерах.предупреждениеЭтот подход дает вам максимальную гибкость, но создаст постоянные артефакты в Databricks (рабочий процесс), которые пользователи могут запускать вне dbt.
В настоящее время мы находимся в переходном периоде, когда существует разрыв между старыми методами отправки (которые были сгруппированы по вычислениям) и логически различными методами отправки (команда, запуск задания, рабочий процесс).
Таким образом, поддерживаемая матрица конфигурации несколько сложна:
| Loading table... |
* Только timeout и cluster_id/http_path поддерживаются, когда create_notebook равно false
С введением метода отправки workflow_job, мы выбрали дальнейшую сегрегацию конфигурации отправки модели Python под верхнеуровневую конфигурацию, названную python_job_config. Это позволяет конфигурационным опциям для заданий и рабочих процессов быть пространственно именованными таким образом, чтобы они не мешали другим конфигурациям модели, что позволяет нам быть гораздо более гибкими в том, что поддерживается для выполнения заданий.
Матрица поддержки для этой функции разделена на workflow_job и все остальные (предполагая all_purpose_cluster с create_notebook==true).
Каждая опция конфигурации, указанная ниже, должна быть вложена под python_job_config:
| Loading table... |
Этот пример использует новые опции конфигурации из предыдущей таблицы:
models:
- name: my_model
config:
submission_method: workflow_job
# Определите кластер заданий для создания для выполнения этого рабочего процесса
# В качестве альтернативы можно указать cluster_id для использования существующего кластера или не указывать ни одного, чтобы использовать безсерверный кластер
job_cluster_config:
spark_version: "15.3.x-scala2.12"
node_type_id: "rd-fleet.2xlarge"
runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}"
data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}"
autoscale: { "min_workers": 1, "max_workers": 4 }
python_job_config:
# Эти настройки передаются, как есть, в запрос
email_notifications: { on_failure: ["me@example.com"] }
max_retries: 2
name: my_workflow_name
# Переопределите настройки для задачи dbt вашей модели. Например, вы можете
# изменить ключ задачи
additional_task_settings: { "task_key": "my_dbt_task" }
# Определите задачи для выполнения до/после модели
# Этот пример предполагает, что вы уже загрузили блокнот в /my_notebook_path для выполнения оптимизации и очистки
post_hook_tasks:
[
{
"depends_on": [{ "task_key": "my_dbt_task" }],
"task_key": "OPTIMIZE_AND_VACUUM",
"notebook_task":
{ "notebook_path": "/my_notebook_path", "source": "WORKSPACE" },
},
]
# Упрощенная структура, вместо необходимости указывать разрешения отдельно для каждого пользователя
grants:
view: [{ "group_name": "marketing-team" }]
run: [{ "user_name": "other_user@example.com" }]
manage: []
Настройка колонок
Доступно в версиях 1.10 и выше
При материализации моделей различных типов вы можете указывать несколько необязательных конфигураций на уровне колонок, которые специфичны для плагина dbt-databricks, в дополнение к стандартным настройкам колонок. Поддержка тегов колонок и масок колонок была добавлена в dbt-databricks версии 1.10.4.
| Loading table... |
* using_columns supports all parameter types listed in Databricks column mask parameters.
† databricks_tags are applied via ALTER statements. Tags cannot be removed via dbt-databricks once applied. To remove tags, use Databricks directly or a post-hook.
This example uses the column-level configurations in the previous table:
models:
- name: customers
columns:
- name: customer_id
databricks_tags:
data_classification: "public"
- name: email
databricks_tags:
data_classification: "pii"
column_mask:
function: my_catalog.my_schema.mask_email
using_columns: "customer_id, 'literal string'"
Инкрементальные модели
Доступно в версиях 1.9 и выше
dbt-databricks v1.11.0 требует Databricks Runtime 12.2 LTS или выше для инкрементальных моделей
В этой версии добавлено исправление проблемы с несоответствием порядка колонок в инкрементальных моделях за счёт использования синтаксиса Databricks INSERT BY NAME (доступен начиная с DBR 12.2). Это предотвращает повреждение данных, которое могло возникать при изменении порядка колонок в моделях с настройкой on_schema_change: sync_all_columns.
Если вы используете более старую версию runtime:
- Зафиксируйте версию
dbt-databricksна1.10.x - Или обновитесь до DBR 12.2 LTS или выше
Это ломающее изменение затрагивает все стратегии инкрементальной загрузки: append, insert_overwrite, replace_where, delete+insert и merge (через создание промежуточной таблицы).
Подробнее об изменениях в v1.11.0 см. в dbt-databricks v1.11.0 changelog.
Плагин dbt-databricks в значительной степени опирается на конфигурацию incremental_strategy. Эта настройка определяет, как инкрементальная материализация будет собирать модель при запусках после первого. Она может принимать одно из шести значений:
append: Вставляет новые записи, не обновляя и не перезаписывая существующие данные.insert_overwrite: Если заданpartition_by, перезаписывает соответствующие партиции в table новыми данными. Еслиpartition_byне указан, перезаписывает всю таблицу новыми данными.merge(по умолчанию; только для форматов файлов Delta и Hudi): Сопоставляет записи на основеunique_key, обновляя существующие записи и вставляя новые. (Еслиunique_keyне указан, все новые данные вставляются — аналогичноappend.)replace_where(только для формата файлов Delta): Сопоставляет записи на основеincremental_predicates, заменяя все записи в существующей таблице, которые соответствуют предикатам, на записи из новых данных с теми же предикатами. (Еслиincremental_predicatesне указаны, все новые данные вставляются — аналогичноappend.)delete+insert(только для формата файлов Delta, доступно в v1.11+): Сопоставляет записи на основе обязательногоunique_key, удаляет совпадающие записи и вставляет новые. Дополнительно может применяться фильтрация с помощьюincremental_predicates.microbatch(только для формата файлов Delta): Реализует стратегию microbatch, используяreplace_whereс предикатами, которые генерируются на основеevent_time.
Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указана в dbt_project.yml или в блоке config() файла модели.
Стратегия append
Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в том, что она проста и функциональна на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
create temporary view databricks_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.databricks_incremental
select `date_day`, `users` from databricks_incremental__dbt_tmp
Стратегия insert_overwrite
Стратегия insert_overwrite обновляет данные в таблице, заменяя существующие записи, а не просто добавляя новые. Эта стратегия наиболее эффективна, когда она указана вместе с параметром partition_by или liquid_clustered_by в конфигурации модели — это помогает определить конкретные партиции или кластеры, на которые влияет ваш запрос. dbt выполнит атомарный оператор insert into ... replace on, который динамически заменяет все партиции или кластеры, затронутые запросом, вместо перестроения всей таблицы целиком.
Важно! При использовании этой инкрементальной стратегии обязательно повторно выбирайте (re-select) все релевантные данные для соответствующей партиции или кластера.
При использовании liquid_clustered_by ключи replace on будут совпадать с ключами liquid_clustered_by (аналогично поведению partition_by).
Если установить use_replace_on_for_insert_overwrite в True (в SQL warehouses или при использовании кластерных вычислений), dbt будет динамически перезаписывать партиции и заменять только те партиции или кластеры, которые возвращаются запросом модели. В этом случае dbt выполняет оператор insert overwrite с partitionOverwriteMode='dynamic', что помогает сократить количество ненужных перезаписей и повысить производительность.
Если установить use_replace_on_for_insert_overwrite в False в SQL warehouses, dbt будет обрезать (полностью очищать) таблицу перед вставкой новых данных. Это приводит к замене всех строк таблицы при каждом запуске модели, что может увеличить время выполнения и стоимость для больших наборов данных.
Если вы не укажете partition_by или liquid_clustered_by, стратегия insert_overwrite будет атомарно заменять всё содержимое таблицы, перезаписывая все существующие данные только новыми записями. При этом схема столбцов таблицы остаётся неизменной. В некоторых ограниченных случаях такое поведение может быть желательным, так как оно минимизирует простой во время перезаписи содержимого таблицы. Эта операция сопоставима с выполнением truncate и insert в других базах данных. Для атомарной замены таблиц в формате Delta вместо этого используйте материализацию table (которая выполняет create or replace).
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from new_events
group by 1
create temporary view databricks_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.databricks_incremental
partition (date_day)
select `date_day`, `users` from databricks_incremental__dbt_tmp
Стратегия merge
Инкрементальная стратегия merge требует:
file_format: delta или hudi- Databricks Runtime 5.1 и выше для формата файла delta
- Apache Spark для формата файла hudi
Адаптер Databricks выполнит атомарный оператор merge, аналогичный поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt пропустит критерии совпадения и просто вставит все новые записи (аналогично стратегии append).
Указание merge в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда никакая не указана.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta', # или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view merge_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
merge into analytics.merge_incremental as DBT_INTERNAL_DEST
using merge_incremental__dbt_tmp as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
when matched then update set *
when not matched then insert *
Начиная с версии 1.9, поведение merge можно изменить с помощью следующих дополнительных параметров конфигурации:
target_alias,source_alias: Алиасы для целевой и исходной таблиц, которые позволяют более наглядно описывать условия merge. По умолчанию используютсяDBT_INTERNAL_DESTиDBT_INTERNAL_SOURCEсоответственно.skip_matched_step: Если установлено вtrue, секцияmatchedв операторе merge не будет включена.skip_not_matched_step: Если установлено вtrue, секцияnot matchedне будет включена.matched_condition: Условие, применяемое к секцииWHEN MATCHED. Для написания условного выражения следует использоватьtarget_aliasиsource_alias, например:DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3). Это условие дополнительно ограничивает набор строк, считающихся совпавшими.not_matched_condition: Условие, применяемое к секцииWHEN NOT MATCHED [BY TARGET]. Это условие дополнительно ограничивает набор строк в целевой таблице, которые не совпадают с источником и будут вставлены в результирующую таблицу.not_matched_by_source_condition: Условие, применяемое как дополнительный фильтр в секцииWHEN NOT MATCHED BY SOURCE. Используется только совместно сnot_matched_by_source_action.not_matched_by_source_action: Действие, которое выполняется при выполнении условия. Настраивается как выражение. Например:not_matched_by_source_action: "update set t.attr1 = 'deleted', t.tech_change_ts = current_timestamp()".merge_with_schema_evolution: Если установлено вtrue, оператор merge будет включать секциюWITH SCHEMA EVOLUTION.
Для получения более подробной информации о значении каждого условия слияния, пожалуйста, обратитесь к документации Databricks.
Ниже приведен пример, демонстрирующий использование этих новых опций:
- Исходный код
- Код выполнения
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
merge_with_schema_evolution=true
) }}
select
id,
attr1,
attr2,
tech_change_ts
from
{{ ref('source_table') }} as s
create temporary view merge_incremental__dbt_tmp as
select
id,
attr1,
attr2,
tech_change_ts
from upstream.source_table
;
merge
with schema evolution
into
target_table as t
using (
select
id,
attr1,
attr2,
tech_change_ts
from
source_table as s
)
on
t.id <=> s.id
when matched
and t.tech_change_ts < s.tech_change_ts
then update set
id = s.id,
attr1 = s.attr1,
attr2 = s.attr2,
tech_change_ts = s.tech_change_ts
when not matched
and s.attr1 IS NOT NULL
then insert (
id,
attr1,
attr2,
tech_change_ts
) values (
s.id,
s.attr1,
s.attr2,
s.tech_change_ts
)
when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
Стратегия replace_where
Инкрементальная стратегия replace_where требует:
file_format: delta- Databricks Runtime 12.0 и выше
dbt выполнит атомарный оператор replace where, который избирательно перезаписывает данные, соответствующие одному или нескольким incremental_predicates, указанным в виде строки или массива. Только строки, соответствующие предикатам, будут вставлены. Если incremental_predicates не указаны, dbt выполнит атомарную вставку, как в случае append.
replace_where вставляет данные в столбцы в порядке их предоставления, а не по имени столбца. Если вы измените порядок столбцов и данные совместимы с существующей схемой, вы можете незаметно вставить значения в неожиданный столбец. Если входные данные несовместимы с существующей схемой, вы получите ошибку.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'replace_where'
incremental_predicates = 'user_id >= 10000' # Никогда не заменяйте пользователей с id < 10000
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
create temporary view replace_where__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
;
insert into analytics.replace_where_incremental
replace where user_id >= 10000
table `replace_where__dbt_tmp`
Стратегия delete+insert
Доступно в версиях 1.11 или выше
Инкрементальная стратегия delete+insert требует:
file_format: delta- Обязательной конфигурации
unique_key - Databricks Runtime 12.2 LTS или выше
The delete+insert strategy is a simpler alternative to the merge strategy for cases where you want to replace matching records without the complexity of updating specific columns. This strategy works in two steps:
- Delete: Remove all rows from the target table where the
unique_keymatches rows in the new data. - Insert: Insert all new rows from the staging data.
This strategy is particularly useful when:
- You want to replace entire records rather than update specific columns
- Your business logic requires a clean "remove and replace" approach
- You need a simpler incremental strategy than
mergefor full record replacement
При использовании Databricks Runtime версии 17.1 или выше dbt применяет эффективный синтаксис INSERT INTO ... REPLACE ON для атомарного выполнения этой операции. Для более старых версий runtime dbt выполняет отдельные операторы DELETE и INSERT.
При необходимости вы можете использовать incremental_predicates, чтобы дополнительно отфильтровать обрабатываемые записи, получив больший контроль над тем, какие строки удаляются и вставляются.
- Source code
- Run code (DBR 17.1+)
- Run code (DBR < 17.1)
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy='delete+insert',
unique_key='user_id'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from new_events
group by 1
create temporary view delete_insert_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from new_events
group by 1
;
insert into table analytics.delete_insert_incremental as target
replace on (target.user_id <=> temp.user_id)
(select `user_id`, `last_seen`
from delete_insert_incremental__dbt_tmp where date_day >= date_add(current_date, -1)) as temp
create temporary view delete_insert_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
user_id,
max(date_day) as last_seen
from new_events
group by 1
;
-- Step 1: Delete matching rows
delete from analytics.delete_insert_incremental
where analytics.delete_insert_incremental.user_id IN (SELECT user_id FROM delete_insert_incremental__dbt_tmp)
and date_day >= date_add(current_date, -1);
-- Step 2: Insert new rows
insert into analytics.delete_insert_incremental by name
select `user_id`, `last_seen`
from delete_insert_incremental__dbt_tmp
where date_day >= date_add(current_date, -1)
Стратегия microbatch
Доступно в версиях 1.9 и выше
Адаптер Databricks реализует стратегию microbatch, используя replace_where. Обратите внимание на требования и предупреждения, указанные выше для replace_where. Дополнительную информацию об этой стратегии см. на странице справки microbatch.
В следующем примере в таблице events добавлен столбец event_time, называемый ts, в ее схеме.
- Исходный код
- Код выполнения
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'microbatch'
event_time='date' # Используйте 'date' как зерно для этой таблицы микропакетов
) }}
with new_events as (
select * from {{ ref('events') }}
)
select
user_id,
date,
count(*) as visits
from events
group by 1, 2
create temporary view replace_where__dbt_tmp as
with new_events as (
select * from (select * from analytics.events where ts >= '2024-10-01' and ts < '2024-10-02')
)
select
user_id,
date,
count(*) as visits
from events
group by 1, 2
;
insert into analytics.replace_where_incremental
replace where CAST(date as TIMESTAMP) >= '2024-10-01' and CAST(date as TIMESTAMP) < '2024-10-02'
table `replace_where__dbt_tmp`
Python model configuration
Адаптер Databricks поддерживает Python-модели. Databricks использует PySpark в качестве фреймворка обработки для этих моделей.
Методы отправки: Databricks поддерживает несколько различных механизмов отправки кода PySpark, каждый из которых имеет свои относительные преимущества. Одни лучше подходят для итеративной разработки, другие — для более дешёвых продакшн-развёртываний. Доступны следующие варианты:
all_purpose_cluster(по умолчанию): dbt будет запускать вашу Python-модель, используя идентификатор кластера, настроенный какclusterв вашем профиле подключения или для конкретной модели. Эти кластеры дороже, но значительно более отзывчивы. Мы рекомендуем использовать интерактивный all-purpose кластер для более быстрой итерации при разработке.create_notebook: True: dbt загрузит скомпилированный PySpark-код вашей модели в ноутбук в пространстве имён/Shared/dbt_python_model/{schema}, где{schema}— это настроенная схема модели, и выполнит этот ноутбук с использованием all-purpose кластера. Преимущество этого подхода в том, что вы можете легко открыть ноутбук в интерфейсе Databricks для отладки или тонкой настройки сразу после выполнения модели. Не забудьте скопировать все изменения обратно в код вашей dbt-модели.pyперед повторным запуском.create_notebook: False(по умолчанию): dbt будет использовать Command API, который работает немного быстрее.
job_cluster: dbt загрузит скомпилированный PySpark-код вашей модели в ноутбук в пространстве имён/Shared/dbt_python_model/{schema}, где{schema}— это настроенная схема модели, и выполнит этот ноутбук, используя краткоживущий jobs-кластер. Для каждой Python-модели Databricks потребуется поднять кластер, выполнить PySpark-трансформацию модели, а затем остановить кластер. Поэтому job-кластеры требуют больше времени до и после выполнения модели, но они также дешевле, поэтому мы рекомендуем их для длительно выполняющихся Python-моделей в продакшене. Чтобы использовать метод отправкиjob_cluster, ваша модель должна быть настроена сjob_cluster_config, который определяет свойства ключ-значение дляnew_cluster, как описано в JobRunsSubmit API.
Вы можете настроить submission_method для каждой модели всеми стандартными способами задания конфигурации:
def model(dbt, session):
dbt.config(
submission_method="all_purpose_cluster",
create_notebook=True,
cluster_id="abcd-1234-wxyz"
)
...
models:
- name: my_python_model
config:
submission_method: job_cluster
job_cluster_config:
spark_version: ...
node_type_id: ...
# dbt_project.yml
models:
project_name:
subfolder:
# set defaults for all .py models defined in this subfolder
+submission_method: all_purpose_cluster
+create_notebook: False
+cluster_id: abcd-1234-wxyz
Если конфигурация не задана, dbt-spark будет использовать встроенные значения по умолчанию: all-purpose кластер (на основе cluster в вашем профиле подключения) без создания ноутбука. Адаптер dbt-databricks по умолчанию будет использовать кластер, настроенный в http_path. Мы рекомендуем явно настраивать кластеры для Python-моделей в проектах Databricks.
Installing packages: When using all-purpose clusters, we recommend installing packages which you will be using to run your Python models.
Related docs:
Выбор вычислительных ресурсов для каждой модели
Начиная с версии 1.7.2, вы можете назначать, какой вычислительный ресурс использовать для каждой модели. Для SQL-моделей вы можете выбрать SQL Warehouse (безсерверный или предоставленный) или кластер общего назначения. Для получения подробной информации о том, как эта функция взаимодействует с моделями Python, см. Указание вычислительных ресурсов для моделей Python.
Это необязательная настройка. Если вы не настроите это, как показано ниже, мы будем использовать вычислительные ресурсы, указанные в http_path в верхнем уровне раздела output в вашем профиле. Это также вычислительные ресурсы, которые будут использоваться для задач, не связанных с конкретной моделью, таких как сбор метаданных для всех таблиц в схеме.
Чтобы воспользоваться этой возможностью, вам нужно будет добавить блоки вычислений в ваш профиль:
profile-name:
target: target-name # это целевой объект по умолчанию
outputs:
target-name:
type: databricks
catalog: optional catalog name if you are using Unity Catalog
schema: schema name # Обязательно
host: yourorg.databrickshost.com # Обязательно
### Этот путь используется как вычислительный ресурс по умолчанию
http_path: /sql/your/http/path # Обязательно
### Новый раздел вычислений
compute:
### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса
### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...
target-name: # дополнительные целевые объекты
...
### Для каждого целевого объекта вам нужно определить те же вычислительные ресурсы,
### но вы можете указать разные пути
compute:
### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса
### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...
Новый раздел вычислений представляет собой карту имен, выбранных пользователем, к объектам с свойством http_path. Каждый вычислительный ресурс имеет ключ, который используется в определении/конфигурации модели для указания, какой вычислительный ресурс вы хотите использовать для этой модели/выбора моделей. Мы рекомендуем выбирать имя, которое легко распознается как используемые вами вычислительные ресурсы, например, имя вычислительного ресурса в интерфейсе Databricks.
Вам нужно использовать один и тот же набор имен для вычислительных ресурсов во всех ваших выходных данных, хотя вы можете предоставить разные http_paths, что позволяет использовать разные вычислительные ресурсы в разных сценариях развертывания.
Чтобы настроить это внутри <Constant name="cloud" />, используйте функцию расширенных атрибутов в нужных окружениях:
compute:
Compute1:
http_path: /SOME/OTHER/PATH
Compute2:
http_path: /SOME/OTHER/PATH
Указание вычислительных ресурсов для моделей
Как и многие другие параметры конфигурации, вычислительные ресурсы (compute) для модели можно указать несколькими способами, используя databricks_compute.
В файле dbt_project.yml выбранный compute можно задать сразу для всех моделей в определённом каталоге:
...
models:
+databricks_compute: "Compute1" # используйте склад/кластер `Compute1` для всех моделей в проекте...
my_project:
clickstream:
+databricks_compute: "Compute2" # ...за исключением моделей в папке `clickstream`, которые будут использовать `Compute2`.
snapshots:
+databricks_compute: "Compute1" # все модели Snapshot настроены на использование `Compute1`.
Для отдельной модели вычислительные ресурсы могут быть указаны в конфигурации модели в вашем файле схемы.
models:
- name: table_model
config:
databricks_compute: Compute1
columns:
- name: id
data_type: int
В качестве альтернативы хранилище данных можно указать в конфигурации SQL‑файла модели.
{{
config(
materialized='table',
databricks_compute='Compute1'
)
}}
select * from {{ ref('seed') }}
Чтобы убедиться, что указанные вычислительные ресурсы используются, ищите строки в вашем dbt.log, такие как:
Адаптер Databricks ... использует вычислительный ресурс по умолчанию.
или
Адаптер Databricks ... использует вычислительный ресурс <имя вычислительного ресурса>.
Указание вычислительных ресурсов для моделей Python
Материализация python‑модели требует выполнения как SQL, так и Python‑кода.
В частности, если ваша python‑модель является инкрементальной, текущий шаблон выполнения предполагает запуск Python для создания staging‑таблицы, которая затем объединяется (merge) с целевой таблицей с помощью SQL.
Python‑код должен выполняться на all purpose кластере (или serverless кластере, см. Python Submission Methods), тогда как SQL‑код может выполняться как на all purpose кластере, так и на SQL Warehouse.
Когда вы указываете databricks_compute для python‑модели, в настоящее время вы задаёте только то вычислительное окружение, которое используется при выполнении SQL, специфичного для модели.
Если вы хотите использовать другое вычислительное окружение для выполнения самого Python‑кода, необходимо указать альтернативный compute в конфигурации модели.
Например:
def model(dbt, session):
dbt.config(
http_path="sql/protocolv1/..."
)
Если ваш вычислительный ресурс по умолчанию является SQL-складом, вам нужно будет указать http_path кластера общего назначения таким образом.
Сохранение описаний моделей
Поддерживается сохранение документации на уровне relation. Для получения дополнительной информации о настройке сохранения документации см. документацию.
Когда опция persist_docs настроена соответствующим образом, вы сможете
увидеть описания моделей в поле Comment команды describe [table] extended
или show table extended in [database] like '*'.
Конфигурации формата файлов по умолчанию
Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как
снимки и инкрементальная стратегия merge, вы захотите
использовать формат файлов Delta или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.
Это довольно удобно сделать, установив верхнеуровневую конфигурацию в вашем файле проекта:
models:
+file_format: delta # или hudi
seeds:
+file_format: delta # или hudi
snapshots:
+file_format: delta # или hudi
Материализованные представления и стриминговые таблицы
Материализованные представления и потоковые таблицы являются альтернативами инкрементальным таблицам, которые поддерживаются Delta Live Tables. См. Что такое Delta Live Tables? для получения дополнительной информации и примеров использования.
Чтобы использовать эти стратегии материализации, вам потребуется рабочее пространство, включенное для Unity Catalog и безсерверных SQL-складов.
{{ config(
materialized = 'materialized_view'
) }}
или
{{ config(
materialized = 'streaming_table'
) }}
Мы поддерживаем параметр on_configuration_change для большинства доступных свойств этих материализаций.
В следующей таблице приведено обобщение поддержки конфигураций:
| Loading table... |
{{ config(
materialized='materialized_view',
partition_by='id',
schedule = {
'cron': '0 0 * * * ? *',
'time_zone_value': 'Etc/UTC'
},
tblproperties={
'key': 'value'
},
) }}
select * from {{ ref('my_seed') }}
Подробности конфигурации
partition_by
partition_by работает так же, как для представлений и таблиц, т.е. может быть одним столбцом или массивом столбцов для разделения.
liquid_clustered_by
Доступно в версиях 1.11 или выше
liquid_clustered_by включает liquid clustering для материализованных представлений и стриминговых таблиц. Liquid clustering оптимизирует производительность запросов за счёт совместного размещения похожих данных в одних и тех же файлах, что особенно полезно для запросов с селективными фильтрами по кластеризованным столбцам.
Примечание: Нельзя использовать partition_by и liquid_clustered_by одновременно в одной материализации, так как Databricks не позволяет комбинировать эти возможности.
databricks_tags
Доступно в версиях 1.11 или выше
databricks_tags позволяет применять теги Unity Catalog к вашим материализованным представлениям и стриминговым таблицам для управления данными и их организации. Теги представляют собой пары ключ-значение и могут использоваться для классификации данных, политик контроля доступа и управления метаданными.
{{ config(
materialized='streaming_table',
databricks_tags={'pii': 'contains_email', 'team': 'analytics'}
) }}
Теги применяются с помощью операторов ALTER после создания материализации. После применения теги нельзя удалить через изменения конфигурации dbt-databricks. Для удаления тегов необходимо использовать Databricks напрямую или post-hook.
description
Как и в случае представлений и таблиц, добавление description в вашу конфигурацию приведет к добавлению комментария на уровне таблицы в вашу материализацию.
tblproperties
tblproperties работает так же, как для представлений и таблиц, с важным исключением: адаптер поддерживает список ключей, которые устанавливаются Databricks при создании материализованного представления или потоковой таблицы, которые игнорируются для определения изменений конфигурации.
schedule
Используйте это для установки расписания обновления для модели. Если вы используете ключ schedule, ключ cron обязателен в связанном словаре, но time_zone_value является необязательным (см. пример выше). Значение cron должно быть отформатировано, как это задокументировано Databricks.
Если расписание установлено на материализации в Databricks, а ваш проект dbt не указывает расписание для него (когда on_configuration_change установлено в apply), расписание обновления будет установлено на ручное при следующем запуске проекта.
Даже когда расписания установлены, dbt будет запрашивать, чтобы материализация обновлялась вручную при запуске.
query
Для материализованных представлений, если скомпилированный запрос для модели отличается от запроса в базе данных, мы примем действие, указанное в on_configuration_change.
Изменения в запросе в настоящее время не обнаруживаются для потоковых таблиц; см. следующий раздел для подробностей.
on_configuration_change
on_configuration_change поддерживается для материализованных представлений и потоковых таблиц, хотя две материализации обрабатывают это по-разному.
Материализованные представления
В настоящее время единственное изменение, которое можно применить без воссоздания материализованного представления в Databricks, - это обновление расписания. Это связано с ограничениями в SQL API Databricks.
Streaming Tables
Для стриминговых таблиц в настоящее время только изменения в партиционировании требуют, чтобы таблица была удалена и создана заново.
Для любых других поддерживаемых изменений конфигурации мы используем CREATE OR REFRESH (а также оператор ALTER для изменений расписания), чтобы применить эти изменения.
В настоящее время у адаптера нет механизма для определения того, изменилась ли SQL‑запрос стриминговой таблицы. Поэтому в этом случае, независимо от поведения, заданного в on_configuration_change, будет использован оператор create or refresh (при условии, что partitioned by не изменился). Это приведёт к тому, что обновлённый запрос будет применяться только к будущим строкам, без повторного выполнения для уже обработанных данных.
Если исходные данные всё ещё доступны, запуск с флагом --full-refresh повторно обработает доступные данные с использованием обновлённого текущего запроса.
Setting table properties
Table properties можно задавать в конфигурации таблиц или представлений с помощью параметра tblproperties:
{{ config(
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
'delta.autoOptimize.autoCompact' : 'true'
}
) }}
Эти свойства отправляются напрямую в Databricks без проверки в dbt, поэтому будьте внимательны при использовании этой функции. Вам нужно будет выполнить полное обновление инкрементальных материализаций, если вы измените их tblproperties.
Одним из применений этой функции является обеспечение совместимости таблиц delta с читателями iceberg с использованием Универсального формата.
{{ config(
tblproperties={
'delta.enableIcebergCompatV2' = 'true'
'delta.universalFormat.enabledFormats' = 'iceberg'
}
) }}
tblproperties могут быть указаны для моделей Python, но они будут применены через оператор ALTER после создания таблицы.
Это связано с ограничением в PySpark.