Перейти к основному содержимому

Конфигурации Databricks

Конфигурация таблиц

При материализации модели как table, вы можете включить несколько дополнительных конфигураций, специфичных для плагина dbt-databricks, в дополнение к стандартным конфигурациям моделей.

dbt-databricks v1.9 добавляет поддержку конфигурации table_format: iceberg. Попробуйте уже сейчас в ветке релизов dbt "Latest". Все остальные конфигурации таблиц также поддерживались в версии 1.8.

OptionDescriptionRequired?Model supportExample
table_formatНужно ли включать совместимость с Iceberg для данной materializationOptionalSQL, Pythoniceberg
file_format Формат файлов, который будет использоваться при создании таблиц (parquet, delta, hudi, csv, json, text, jdbc, orc, hive или libsvm).OptionalSQL, Pythondelta
location_rootСоздаваемая таблица использует указанный каталог для хранения данных. К этому пути будет добавлен алиас таблицы.OptionalSQL, Python/mnt/root
partition_byРазбивает создаваемую таблицу на партиции по указанным колонкам. Для каждой партиции создаётся отдельный каталог.OptionalSQL, Pythondate_day
liquid_clustered_by^Кластеризует создаваемую таблицу по указанным колонкам. Метод кластеризации основан на функциональности Liquid Clustering в Delta. Доступно начиная с dbt-databricks 1.6.2.OptionalSQL, Pythondate_day
auto_liquid_cluster+Создаваемая таблица автоматически кластеризуется Databricks. Доступно начиная с dbt-databricks 1.10.0OptionalSQL, Pythonauto_liquid_cluster: true
clustered_byКаждая партиция в создаваемой таблице будет разбита на фиксированное количество бакетов по указанным колонкам.OptionalSQL, Pythoncountry_code
bucketsКоличество бакетов, создаваемых при кластеризацииRequired if clustered_by is specifiedSQL, Python8
tblpropertiesTblproperties, которые будут установлены для создаваемой таблицыOptionalSQL, Python*{'this.is.my.key': 12}
databricks_tagsТеги, которые будут заданы для создаваемой таблицыOptionalSQL , Python {'my_tag': 'my_value'}
compressionУстанавливает алгоритм сжатия.OptionalSQL, Pythonzstd
Loading table...

* В настоящее время в PySpark нет API для задания tblproperties при создании таблицы, поэтому эта возможность в первую очередь предназначена для того, чтобы пользователи могли аннотировать таблицы, созданные из Python, с помощью tblproperties.

† When table_format is iceberg, file_format must be delta.

databricks_tags are applied via ALTER statements. Tags cannot be removed via dbt-databricks once applied. To remove tags, use Databricks directly or a post-hook.

^ Когда включён liquid_clustered_by, dbt-databricks выполняет операцию OPTIMIZE (Liquid Clustering) после каждого запуска. Чтобы отключить это поведение, установите переменную DATABRICKS_SKIP_OPTIMIZE=true, которую можно передать в команду запуска dbt (dbt run --vars "{'databricks_skip_optimize': true}") или задать как переменную окружения. См. issue #802.

+ Do not use liquid_clustered_by and auto_liquid_cluster on the same model.

В dbt-databricks v1.10 появилось несколько новых вариантов конфигурации моделей, доступных при включённом флаге use_materialization_v2. Подробнее см. документацию по флагам поведения Databricks.

Методы отправки Python

Доступно в версиях 1.9 и выше

В dbt-databricks v1.9 (попробуйте уже сейчас в треке релизов dbt «Latest») вы можете использовать следующие четыре варианта для submission_method:

  • all_purpose_cluster: Выполняет модель Python либо напрямую с использованием command api, либо загружая блокнот и создавая одноразовый запуск задания
  • job_cluster: Создает новый кластер заданий для выполнения загруженного блокнота как одноразового запуска задания
  • serverless_cluster: Использует безсерверный кластер для выполнения загруженного блокнота как одноразового запуска задания
  • workflow_job: Создает/обновляет повторно используемый рабочий процесс и загруженный блокнот для выполнения на кластерах общего назначения, задания или безсерверных кластерах.
    предупреждение

    Этот подход дает вам максимальную гибкость, но создаст постоянные артефакты в Databricks (рабочий процесс), которые пользователи могут запускать вне dbt.

В настоящее время мы находимся в переходном периоде, когда существует разрыв между старыми методами отправки (которые были сгруппированы по вычислениям) и логически различными методами отправки (команда, запуск задания, рабочий процесс).

Таким образом, поддерживаемая матрица конфигурации несколько сложна:

КонфигурацияИспользованиеПо умолчаниюall_purpose_cluster*job_clusterserverless_clusterworkflow_job
create_notebookесли false, используйте Command API, иначе загрузите блокнот и используйте запуск заданияfalse
timeoutмаксимальное время ожидания выполнения команды/задания0 (Без таймаута)
job_cluster_configнастраивает новый кластер для выполнения модели{}
access_control_listнапрямую настраивает контроль доступа для задания{}
packagesсписок пакетов для установки на выполняющем кластере[]
index_urlURL для установки packagesNone (использует pypi)
additional_libsнапрямую настраивает библиотеки[]
python_job_configдополнительная конфигурация для заданий/рабочих процессов (см. таблицу ниже){}
cluster_idидентификатор существующего кластера общего назначения для выполненияNone
http_pathпуть к существующему кластеру общего назначения для выполненияNone
Loading table...

* Только timeout и cluster_id/http_path поддерживаются, когда create_notebook равно false

С введением метода отправки workflow_job, мы выбрали дальнейшую сегрегацию конфигурации отправки модели Python под верхнеуровневую конфигурацию, названную python_job_config. Это позволяет конфигурационным опциям для заданий и рабочих процессов быть пространственно именованными таким образом, чтобы они не мешали другим конфигурациям модели, что позволяет нам быть гораздо более гибкими в том, что поддерживается для выполнения заданий.

Матрица поддержки для этой функции разделена на workflow_job и все остальные (предполагая all_purpose_cluster с create_notebook==true). Каждая опция конфигурации, указанная ниже, должна быть вложена под python_job_config:

КонфигурацияИспользованиеПо умолчаниюworkflow_jobВсе остальные
nameИмя, которое будет присвоено (или использовано для поиска) созданному рабочему процессуNone
grantsУпрощенный способ указать контроль доступа для рабочего процесса{}
existing_job_idИдентификатор для использования при поиске созданного рабочего процесса (вместо name)None
post_hook_tasksЗадачи, которые будут включены после выполнения блокнота модели[]
additional_task_settingsДополнительная конфигурация задачи, которая будет включена в задачу модели{}
Другие настройки запуска заданияКонфигурация будет скопирована в запрос, вне задачи моделиNone
Другие настройки рабочего процессаКонфигурация будет скопирована в запрос, вне задачи моделиNone
Loading table...

Этот пример использует новые опции конфигурации из предыдущей таблицы:

schema.yml
models:
- name: my_model
config:
submission_method: workflow_job

# Определите кластер заданий для создания для выполнения этого рабочего процесса
# В качестве альтернативы можно указать cluster_id для использования существующего кластера или не указывать ни одного, чтобы использовать безсерверный кластер
job_cluster_config:
spark_version: "15.3.x-scala2.12"
node_type_id: "rd-fleet.2xlarge"
runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}"
data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}"
autoscale: { "min_workers": 1, "max_workers": 4 }

python_job_config:
# Эти настройки передаются, как есть, в запрос
email_notifications: { on_failure: ["me@example.com"] }
max_retries: 2

name: my_workflow_name

# Переопределите настройки для задачи dbt вашей модели. Например, вы можете
# изменить ключ задачи
additional_task_settings: { "task_key": "my_dbt_task" }

# Определите задачи для выполнения до/после модели
# Этот пример предполагает, что вы уже загрузили блокнот в /my_notebook_path для выполнения оптимизации и очистки
post_hook_tasks:
[
{
"depends_on": [{ "task_key": "my_dbt_task" }],
"task_key": "OPTIMIZE_AND_VACUUM",
"notebook_task":
{ "notebook_path": "/my_notebook_path", "source": "WORKSPACE" },
},
]

# Упрощенная структура, вместо необходимости указывать разрешения отдельно для каждого пользователя
grants:
view: [{ "group_name": "marketing-team" }]
run: [{ "user_name": "other_user@example.com" }]
manage: []

Настройка колонок

Доступно в версиях 1.10 и выше

При материализации моделей различных типов вы можете указывать несколько необязательных конфигураций на уровне колонок, которые специфичны для плагина dbt-databricks, в дополнение к стандартным настройкам колонок. Поддержка тегов колонок и масок колонок была добавлена в dbt-databricks версии 1.10.4.

OptionDescriptionRequired?Model supportMaterialization supportExample
databricks_tagsTags to be set on individual columnsOptionalSQL†, Python†Table, Incremental, Materialized View, Streaming Table{'data_classification': 'pii'}
column_maskColumn mask configuration for dynamic data masking. Accepts function and optional using_columns properties*OptionalSQL, PythonTable, Incremental, Streaming Table{'function': 'my_catalog.my_schema.mask_email'}
Loading table...

* using_columns supports all parameter types listed in Databricks column mask parameters.

databricks_tags are applied via ALTER statements. Tags cannot be removed via dbt-databricks once applied. To remove tags, use Databricks directly or a post-hook.

This example uses the column-level configurations in the previous table:

schema.yml
models:
- name: customers
columns:
- name: customer_id
databricks_tags:
data_classification: "public"
- name: email
databricks_tags:
data_classification: "pii"
column_mask:
function: my_catalog.my_schema.mask_email
using_columns: "customer_id, 'literal string'"

Инкрементальные модели

Доступно в версиях 1.9 и выше

Ломающее изменение в v1.11.0
dbt-databricks v1.11.0 требует Databricks Runtime 12.2 LTS или выше для инкрементальных моделей

В этой версии добавлено исправление проблемы с несоответствием порядка колонок в инкрементальных моделях за счёт использования синтаксиса Databricks INSERT BY NAME (доступен начиная с DBR 12.2). Это предотвращает повреждение данных, которое могло возникать при изменении порядка колонок в моделях с настройкой on_schema_change: sync_all_columns.

Если вы используете более старую версию runtime:

  • Зафиксируйте версию dbt-databricks на 1.10.x
  • Или обновитесь до DBR 12.2 LTS или выше

Это ломающее изменение затрагивает все стратегии инкрементальной загрузки: append, insert_overwrite, replace_where, delete+insert и merge (через создание промежуточной таблицы).

Подробнее об изменениях в v1.11.0 см. в dbt-databricks v1.11.0 changelog.

Плагин dbt-databricks в значительной степени опирается на конфигурацию incremental_strategy. Эта настройка определяет, как инкрементальная материализация будет собирать модель при запусках после первого. Она может принимать одно из шести значений:

  • append: Вставляет новые записи, не обновляя и не перезаписывая существующие данные.
  • insert_overwrite: Если задан partition_by, перезаписывает соответствующие партиции в table новыми данными. Если partition_by не указан, перезаписывает всю таблицу новыми данными.
  • merge (по умолчанию; только для форматов файлов Delta и Hudi): Сопоставляет записи на основе unique_key, обновляя существующие записи и вставляя новые. (Если unique_key не указан, все новые данные вставляются — аналогично append.)
  • replace_where (только для формата файлов Delta): Сопоставляет записи на основе incremental_predicates, заменяя все записи в существующей таблице, которые соответствуют предикатам, на записи из новых данных с теми же предикатами. (Если incremental_predicates не указаны, все новые данные вставляются — аналогично append.)
  • delete+insert (только для формата файлов Delta, доступно в v1.11+): Сопоставляет записи на основе обязательного unique_key, удаляет совпадающие записи и вставляет новые. Дополнительно может применяться фильтрация с помощью incremental_predicates.
  • microbatch (только для формата файлов Delta): Реализует стратегию microbatch, используя replace_where с предикатами, которые генерируются на основе event_time.

Каждая из этих стратегий имеет свои плюсы и минусы, которые мы обсудим ниже. Как и в случае любой конфигурации модели, incremental_strategy может быть указана в dbt_project.yml или в блоке config() файла модели.

Стратегия append

Следуя стратегии append, dbt выполнит оператор insert into со всеми новыми данными. Привлекательность этой стратегии заключается в том, что она проста и функциональна на всех платформах, типах файлов, методах подключения и версиях Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому она, вероятно, будет вставлять дублирующиеся записи для многих источников данных.

databricks_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- Все строки, возвращаемые этим запросом, будут добавлены к существующей таблице

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Стратегия insert_overwrite

Стратегия insert_overwrite обновляет данные в таблице, заменяя существующие записи, а не просто добавляя новые. Эта стратегия наиболее эффективна, когда она указана вместе с параметром partition_by или liquid_clustered_by в конфигурации модели — это помогает определить конкретные партиции или кластеры, на которые влияет ваш запрос. dbt выполнит атомарный оператор insert into ... replace on, который динамически заменяет все партиции или кластеры, затронутые запросом, вместо перестроения всей таблицы целиком.

Важно! При использовании этой инкрементальной стратегии обязательно повторно выбирайте (re-select) все релевантные данные для соответствующей партиции или кластера.

При использовании liquid_clustered_by ключи replace on будут совпадать с ключами liquid_clustered_by (аналогично поведению partition_by).

Если установить use_replace_on_for_insert_overwrite в True (в SQL warehouses или при использовании кластерных вычислений), dbt будет динамически перезаписывать партиции и заменять только те партиции или кластеры, которые возвращаются запросом модели. В этом случае dbt выполняет оператор insert overwrite с partitionOverwriteMode='dynamic', что помогает сократить количество ненужных перезаписей и повысить производительность.

Если установить use_replace_on_for_insert_overwrite в False в SQL warehouses, dbt будет обрезать (полностью очищать) таблицу перед вставкой новых данных. Это приводит к замене всех строк таблицы при каждом запуске модели, что может увеличить время выполнения и стоимость для больших наборов данных.

Если вы не укажете partition_by или liquid_clustered_by, стратегия insert_overwrite будет атомарно заменять всё содержимое таблицы, перезаписывая все существующие данные только новыми записями. При этом схема столбцов таблицы остаётся неизменной. В некоторых ограниченных случаях такое поведение может быть желательным, так как оно минимизирует простой во время перезаписи содержимого таблицы. Эта операция сопоставима с выполнением truncate и insert в других базах данных. Для атомарной замены таблиц в формате Delta вместо этого используйте материализацию table (которая выполняет create or replace).

databricks_incremental.sql
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Каждый раздел, возвращаемый этим запросом, будет перезаписан
при выполнении этой модели
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from new_events
group by 1

Стратегия merge

Инкрементальная стратегия merge требует:

  • file_format: delta или hudi
  • Databricks Runtime 5.1 и выше для формата файла delta
  • Apache Spark для формата файла hudi

Адаптер Databricks выполнит атомарный оператор merge, аналогичный поведению слияния по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевому столбцу. Если unique_key не указан, dbt пропустит критерии совпадения и просто вставит все новые записи (аналогично стратегии append).

Указание merge в качестве инкрементальной стратегии является необязательным, так как это стратегия по умолчанию, используемая, когда никакая не указана.

merge_incremental.sql
{{ config(
materialized='incremental',
file_format='delta', # или 'hudi'
unique_key='user_id',
incremental_strategy='merge'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Начиная с версии 1.9, поведение merge можно изменить с помощью следующих дополнительных параметров конфигурации:

  • target_alias, source_alias: Алиасы для целевой и исходной таблиц, которые позволяют более наглядно описывать условия merge. По умолчанию используются DBT_INTERNAL_DEST и DBT_INTERNAL_SOURCE соответственно.
  • skip_matched_step: Если установлено в true, секция matched в операторе merge не будет включена.
  • skip_not_matched_step: Если установлено в true, секция not matched не будет включена.
  • matched_condition: Условие, применяемое к секции WHEN MATCHED. Для написания условного выражения следует использовать target_alias и source_alias, например: DBT_INTERNAL_DEST.col1 = hash(DBT_INTERNAL_SOURCE.col2, DBT_INTERNAL_SOURCE.col3). Это условие дополнительно ограничивает набор строк, считающихся совпавшими.
  • not_matched_condition: Условие, применяемое к секции WHEN NOT MATCHED [BY TARGET]. Это условие дополнительно ограничивает набор строк в целевой таблице, которые не совпадают с источником и будут вставлены в результирующую таблицу.
  • not_matched_by_source_condition: Условие, применяемое как дополнительный фильтр в секции WHEN NOT MATCHED BY SOURCE. Используется только совместно с not_matched_by_source_action.
  • not_matched_by_source_action: Действие, которое выполняется при выполнении условия. Настраивается как выражение. Например: not_matched_by_source_action: "update set t.attr1 = 'deleted', t.tech_change_ts = current_timestamp()".
  • merge_with_schema_evolution: Если установлено в true, оператор merge будет включать секцию WITH SCHEMA EVOLUTION.

Для получения более подробной информации о значении каждого условия слияния, пожалуйста, обратитесь к документации Databricks.

Ниже приведен пример, демонстрирующий использование этих новых опций:

merge_incremental_options.sql
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
merge_with_schema_evolution=true
) }}

select
id,
attr1,
attr2,
tech_change_ts
from
{{ ref('source_table') }} as s

Стратегия replace_where

Инкрементальная стратегия replace_where требует:

  • file_format: delta
  • Databricks Runtime 12.0 и выше

dbt выполнит атомарный оператор replace where, который избирательно перезаписывает данные, соответствующие одному или нескольким incremental_predicates, указанным в виде строки или массива. Только строки, соответствующие предикатам, будут вставлены. Если incremental_predicates не указаны, dbt выполнит атомарную вставку, как в случае append.

предупреждение

replace_where вставляет данные в столбцы в порядке их предоставления, а не по имени столбца. Если вы измените порядок столбцов и данные совместимы с существующей схемой, вы можете незаметно вставить значения в неожиданный столбец. Если входные данные несовместимы с существующей схемой, вы получите ошибку.

replace_where_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'replace_where'
incremental_predicates = 'user_id >= 10000' # Никогда не заменяйте пользователей с id < 10000
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Стратегия delete+insert

Доступно в версиях 1.11 или выше

Инкрементальная стратегия delete+insert требует:

  • file_format: delta
  • Обязательной конфигурации unique_key
  • Databricks Runtime 12.2 LTS или выше

The delete+insert strategy is a simpler alternative to the merge strategy for cases where you want to replace matching records without the complexity of updating specific columns. This strategy works in two steps:

  1. Delete: Remove all rows from the target table where the unique_key matches rows in the new data.
  2. Insert: Insert all new rows from the staging data.

This strategy is particularly useful when:

  • You want to replace entire records rather than update specific columns
  • Your business logic requires a clean "remove and replace" approach
  • You need a simpler incremental strategy than merge for full record replacement

При использовании Databricks Runtime версии 17.1 или выше dbt применяет эффективный синтаксис INSERT INTO ... REPLACE ON для атомарного выполнения этой операции. Для более старых версий runtime dbt выполняет отдельные операторы DELETE и INSERT.

При необходимости вы можете использовать incremental_predicates, чтобы дополнительно отфильтровать обрабатываемые записи, получив больший контроль над тем, какие строки удаляются и вставляются.

delete_insert_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy='delete+insert',
unique_key='user_id'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from new_events
group by 1

Стратегия microbatch

Доступно в версиях 1.9 и выше

Адаптер Databricks реализует стратегию microbatch, используя replace_where. Обратите внимание на требования и предупреждения, указанные выше для replace_where. Дополнительную информацию об этой стратегии см. на странице справки microbatch.

В следующем примере в таблице events добавлен столбец event_time, называемый ts, в ее схеме.

microbatch_incremental.sql
{{ config(
materialized='incremental',
file_format='delta',
incremental_strategy = 'microbatch'
event_time='date' # Используйте 'date' как зерно для этой таблицы микропакетов
) }}

with new_events as (

select * from {{ ref('events') }}

)

select
user_id,
date,
count(*) as visits

from events
group by 1, 2

Python model configuration

Адаптер Databricks поддерживает Python-модели. Databricks использует PySpark в качестве фреймворка обработки для этих моделей.

Методы отправки: Databricks поддерживает несколько различных механизмов отправки кода PySpark, каждый из которых имеет свои относительные преимущества. Одни лучше подходят для итеративной разработки, другие — для более дешёвых продакшн-развёртываний. Доступны следующие варианты:

  • all_purpose_cluster (по умолчанию): dbt будет запускать вашу Python-модель, используя идентификатор кластера, настроенный как cluster в вашем профиле подключения или для конкретной модели. Эти кластеры дороже, но значительно более отзывчивы. Мы рекомендуем использовать интерактивный all-purpose кластер для более быстрой итерации при разработке.
    • create_notebook: True: dbt загрузит скомпилированный PySpark-код вашей модели в ноутбук в пространстве имён /Shared/dbt_python_model/{schema}, где {schema} — это настроенная схема модели, и выполнит этот ноутбук с использованием all-purpose кластера. Преимущество этого подхода в том, что вы можете легко открыть ноутбук в интерфейсе Databricks для отладки или тонкой настройки сразу после выполнения модели. Не забудьте скопировать все изменения обратно в код вашей dbt-модели .py перед повторным запуском.
    • create_notebook: False (по умолчанию): dbt будет использовать Command API, который работает немного быстрее.
  • job_cluster: dbt загрузит скомпилированный PySpark-код вашей модели в ноутбук в пространстве имён /Shared/dbt_python_model/{schema}, где {schema} — это настроенная схема модели, и выполнит этот ноутбук, используя краткоживущий jobs-кластер. Для каждой Python-модели Databricks потребуется поднять кластер, выполнить PySpark-трансформацию модели, а затем остановить кластер. Поэтому job-кластеры требуют больше времени до и после выполнения модели, но они также дешевле, поэтому мы рекомендуем их для длительно выполняющихся Python-моделей в продакшене. Чтобы использовать метод отправки job_cluster, ваша модель должна быть настроена с job_cluster_config, который определяет свойства ключ-значение для new_cluster, как описано в JobRunsSubmit API.

Вы можете настроить submission_method для каждой модели всеми стандартными способами задания конфигурации:

def model(dbt, session):
dbt.config(
submission_method="all_purpose_cluster",
create_notebook=True,
cluster_id="abcd-1234-wxyz"
)
...
models:
- name: my_python_model
config:
submission_method: job_cluster
job_cluster_config:
spark_version: ...
node_type_id: ...
# dbt_project.yml
models:
project_name:
subfolder:
# set defaults for all .py models defined in this subfolder
+submission_method: all_purpose_cluster
+create_notebook: False
+cluster_id: abcd-1234-wxyz

Если конфигурация не задана, dbt-spark будет использовать встроенные значения по умолчанию: all-purpose кластер (на основе cluster в вашем профиле подключения) без создания ноутбука. Адаптер dbt-databricks по умолчанию будет использовать кластер, настроенный в http_path. Мы рекомендуем явно настраивать кластеры для Python-моделей в проектах Databricks.

Installing packages: When using all-purpose clusters, we recommend installing packages which you will be using to run your Python models.

Related docs:

Выбор вычислительных ресурсов для каждой модели

Начиная с версии 1.7.2, вы можете назначать, какой вычислительный ресурс использовать для каждой модели. Для SQL-моделей вы можете выбрать SQL Warehouse (безсерверный или предоставленный) или кластер общего назначения. Для получения подробной информации о том, как эта функция взаимодействует с моделями Python, см. Указание вычислительных ресурсов для моделей Python.

примечание

Это необязательная настройка. Если вы не настроите это, как показано ниже, мы будем использовать вычислительные ресурсы, указанные в http_path в верхнем уровне раздела output в вашем профиле. Это также вычислительные ресурсы, которые будут использоваться для задач, не связанных с конкретной моделью, таких как сбор метаданных для всех таблиц в схеме.

Чтобы воспользоваться этой возможностью, вам нужно будет добавить блоки вычислений в ваш профиль:

profile.yml

profile-name:
target: target-name # это целевой объект по умолчанию
outputs:
target-name:
type: databricks
catalog: optional catalog name if you are using Unity Catalog
schema: schema name # Обязательно
host: yourorg.databrickshost.com # Обязательно

### Этот путь используется как вычислительный ресурс по умолчанию
http_path: /sql/your/http/path # Обязательно

### Новый раздел вычислений
compute:

### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса

### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...

target-name: # дополнительные целевые объекты
...
### Для каждого целевого объекта вам нужно определить те же вычислительные ресурсы,
### но вы можете указать разные пути
compute:

### Имя, которое вы будете использовать для ссылки на альтернативный вычислительный ресурс
Compute1:
http_path: '/sql/your/http/path' # Обязательно для каждого альтернативного вычислительного ресурса

### Третий именованный вычислительный ресурс, используйте любое имя, которое вам нравится
Compute2:
http_path: '/some/other/path' # Обязательно для каждого альтернативного вычислительного ресурса
...

Новый раздел вычислений представляет собой карту имен, выбранных пользователем, к объектам с свойством http_path. Каждый вычислительный ресурс имеет ключ, который используется в определении/конфигурации модели для указания, какой вычислительный ресурс вы хотите использовать для этой модели/выбора моделей. Мы рекомендуем выбирать имя, которое легко распознается как используемые вами вычислительные ресурсы, например, имя вычислительного ресурса в интерфейсе Databricks.

примечание

Вам нужно использовать один и тот же набор имен для вычислительных ресурсов во всех ваших выходных данных, хотя вы можете предоставить разные http_paths, что позволяет использовать разные вычислительные ресурсы в разных сценариях развертывания.

Чтобы настроить это внутри <Constant name="cloud" />, используйте функцию расширенных атрибутов в нужных окружениях:


compute:
Compute1:
http_path: /SOME/OTHER/PATH
Compute2:
http_path: /SOME/OTHER/PATH

Указание вычислительных ресурсов для моделей

Как и многие другие параметры конфигурации, вычислительные ресурсы (compute) для модели можно указать несколькими способами, используя databricks_compute. В файле dbt_project.yml выбранный compute можно задать сразу для всех моделей в определённом каталоге:

dbt_project.yml

...

models:
+databricks_compute: "Compute1" # используйте склад/кластер `Compute1` для всех моделей в проекте...
my_project:
clickstream:
+databricks_compute: "Compute2" # ...за исключением моделей в папке `clickstream`, которые будут использовать `Compute2`.

snapshots:
+databricks_compute: "Compute1" # все модели Snapshot настроены на использование `Compute1`.

Для отдельной модели вычислительные ресурсы могут быть указаны в конфигурации модели в вашем файле схемы.

schema.yml

models:
- name: table_model
config:
databricks_compute: Compute1
columns:
- name: id
data_type: int

В качестве альтернативы хранилище данных можно указать в конфигурации SQL‑файла модели.

model.sql

{{
config(
materialized='table',
databricks_compute='Compute1'
)
}}
select * from {{ ref('seed') }}

Чтобы убедиться, что указанные вычислительные ресурсы используются, ищите строки в вашем dbt.log, такие как:

Адаптер Databricks ... использует вычислительный ресурс по умолчанию.

или

Адаптер Databricks ... использует вычислительный ресурс <имя вычислительного ресурса>.

Указание вычислительных ресурсов для моделей Python

Материализация python‑модели требует выполнения как SQL, так и Python‑кода.
В частности, если ваша python‑модель является инкрементальной, текущий шаблон выполнения предполагает запуск Python для создания staging‑таблицы, которая затем объединяется (merge) с целевой таблицей с помощью SQL.

Python‑код должен выполняться на all purpose кластере (или serverless кластере, см. Python Submission Methods), тогда как SQL‑код может выполняться как на all purpose кластере, так и на SQL Warehouse.

Когда вы указываете databricks_compute для python‑модели, в настоящее время вы задаёте только то вычислительное окружение, которое используется при выполнении SQL, специфичного для модели.
Если вы хотите использовать другое вычислительное окружение для выполнения самого Python‑кода, необходимо указать альтернативный compute в конфигурации модели.

Например:

model.py

def model(dbt, session):
dbt.config(
http_path="sql/protocolv1/..."
)

Если ваш вычислительный ресурс по умолчанию является SQL-складом, вам нужно будет указать http_path кластера общего назначения таким образом.

Сохранение описаний моделей

Поддерживается сохранение документации на уровне relation. Для получения дополнительной информации о настройке сохранения документации см. документацию.

Когда опция persist_docs настроена соответствующим образом, вы сможете увидеть описания моделей в поле Comment команды describe [table] extended или show table extended in [database] like '*'.

Конфигурации формата файлов по умолчанию

Чтобы получить доступ к расширенным функциям инкрементальных стратегий, таким как снимки и инкрементальная стратегия merge, вы захотите использовать формат файлов Delta или Hudi в качестве формата файлов по умолчанию при материализации моделей как таблиц.

Это довольно удобно сделать, установив верхнеуровневую конфигурацию в вашем файле проекта:

dbt_project.yml
models:
+file_format: delta # или hudi

seeds:
+file_format: delta # или hudi

snapshots:
+file_format: delta # или hudi

Материализованные представления и стриминговые таблицы

Материализованные представления и потоковые таблицы являются альтернативами инкрементальным таблицам, которые поддерживаются Delta Live Tables. См. Что такое Delta Live Tables? для получения дополнительной информации и примеров использования.

Чтобы использовать эти стратегии материализации, вам потребуется рабочее пространство, включенное для Unity Catalog и безсерверных SQL-складов.

materialized_view.sql
{{ config(
materialized = 'materialized_view'
) }}

или

streaming_table.sql
{{ config(
materialized = 'streaming_table'
) }}

Мы поддерживаем параметр on_configuration_change для большинства доступных свойств этих материализаций.
В следующей таблице приведено обобщение поддержки конфигураций:

Концепция DatabricksИмя конфигаПоддержка MV/STВерсия
PARTITIONED BYpartition_byMV/STВсе
CLUSTER BYliquid_clustered_byMV/STv1.11+
COMMENTdescriptionMV/STВсе
TBLPROPERTIEStblpropertiesMV/STВсе
TAGSdatabricks_tagsMV/STv1.11+
SCHEDULE CRONschedule: { 'cron': '\<cron schedule\>', 'time_zone_value': '\<time zone value\>' }MV/STВсе
queryопределяется SQL вашего modelon_configuration_change только для MVВсе
Loading table...
mv_example.sql

{{ config(
materialized='materialized_view',
partition_by='id',
schedule = {
'cron': '0 0 * * * ? *',
'time_zone_value': 'Etc/UTC'
},
tblproperties={
'key': 'value'
},
) }}
select * from {{ ref('my_seed') }}

Подробности конфигурации

partition_by

partition_by работает так же, как для представлений и таблиц, т.е. может быть одним столбцом или массивом столбцов для разделения.

liquid_clustered_by

Доступно в версиях 1.11 или выше

liquid_clustered_by включает liquid clustering для материализованных представлений и стриминговых таблиц. Liquid clustering оптимизирует производительность запросов за счёт совместного размещения похожих данных в одних и тех же файлах, что особенно полезно для запросов с селективными фильтрами по кластеризованным столбцам.

Примечание: Нельзя использовать partition_by и liquid_clustered_by одновременно в одной материализации, так как Databricks не позволяет комбинировать эти возможности.

databricks_tags

Доступно в версиях 1.11 или выше

databricks_tags позволяет применять теги Unity Catalog к вашим материализованным представлениям и стриминговым таблицам для управления данными и их организации. Теги представляют собой пары ключ-значение и могут использоваться для классификации данных, политик контроля доступа и управления метаданными.

{{ config(
materialized='streaming_table',
databricks_tags={'pii': 'contains_email', 'team': 'analytics'}
) }}

Теги применяются с помощью операторов ALTER после создания материализации. После применения теги нельзя удалить через изменения конфигурации dbt-databricks. Для удаления тегов необходимо использовать Databricks напрямую или post-hook.

description

Как и в случае представлений и таблиц, добавление description в вашу конфигурацию приведет к добавлению комментария на уровне таблицы в вашу материализацию.

tblproperties

tblproperties работает так же, как для представлений и таблиц, с важным исключением: адаптер поддерживает список ключей, которые устанавливаются Databricks при создании материализованного представления или потоковой таблицы, которые игнорируются для определения изменений конфигурации.

schedule

Используйте это для установки расписания обновления для модели. Если вы используете ключ schedule, ключ cron обязателен в связанном словаре, но time_zone_value является необязательным (см. пример выше). Значение cron должно быть отформатировано, как это задокументировано Databricks. Если расписание установлено на материализации в Databricks, а ваш проект dbt не указывает расписание для него (когда on_configuration_change установлено в apply), расписание обновления будет установлено на ручное при следующем запуске проекта. Даже когда расписания установлены, dbt будет запрашивать, чтобы материализация обновлялась вручную при запуске.

query

Для материализованных представлений, если скомпилированный запрос для модели отличается от запроса в базе данных, мы примем действие, указанное в on_configuration_change. Изменения в запросе в настоящее время не обнаруживаются для потоковых таблиц; см. следующий раздел для подробностей.

on_configuration_change

on_configuration_change поддерживается для материализованных представлений и потоковых таблиц, хотя две материализации обрабатывают это по-разному.

Материализованные представления

В настоящее время единственное изменение, которое можно применить без воссоздания материализованного представления в Databricks, - это обновление расписания. Это связано с ограничениями в SQL API Databricks.

Streaming Tables

Для стриминговых таблиц в настоящее время только изменения в партиционировании требуют, чтобы таблица была удалена и создана заново.
Для любых других поддерживаемых изменений конфигурации мы используем CREATE OR REFRESH (а также оператор ALTER для изменений расписания), чтобы применить эти изменения.

В настоящее время у адаптера нет механизма для определения того, изменилась ли SQL‑запрос стриминговой таблицы. Поэтому в этом случае, независимо от поведения, заданного в on_configuration_change, будет использован оператор create or refresh (при условии, что partitioned by не изменился). Это приведёт к тому, что обновлённый запрос будет применяться только к будущим строкам, без повторного выполнения для уже обработанных данных.

Если исходные данные всё ещё доступны, запуск с флагом --full-refresh повторно обработает доступные данные с использованием обновлённого текущего запроса.

Setting table properties

Table properties можно задавать в конфигурации таблиц или представлений с помощью параметра tblproperties:

with_table_properties.sql
{{ config(
tblproperties={
'delta.autoOptimize.optimizeWrite' : 'true',
'delta.autoOptimize.autoCompact' : 'true'
}
) }}
предупреждение

Эти свойства отправляются напрямую в Databricks без проверки в dbt, поэтому будьте внимательны при использовании этой функции. Вам нужно будет выполнить полное обновление инкрементальных материализаций, если вы измените их tblproperties.

Одним из применений этой функции является обеспечение совместимости таблиц delta с читателями iceberg с использованием Универсального формата.

{{ config(
tblproperties={
'delta.enableIcebergCompatV2' = 'true'
'delta.universalFormat.enabledFormats' = 'iceberg'
}
) }}

tblproperties могут быть указаны для моделей Python, но они будут применены через оператор ALTER после создания таблицы. Это связано с ограничением в PySpark.

Нашли ошибку?

0
Loading