Перейти к основному содержимому

Конфигурации Snowflake

Формат таблиц Iceberg

Материалы про таблицы Snowflake Iceberg были перенесены на новую страницу!

Динамические таблицы

Адаптер Snowflake поддерживает dynamic tables. Этот тип материализации является специфичным для Snowflake, что означает, что некоторые конфигурации моделей, которые обычно приходят «в комплекте» с dbt-core (например, как у view), могут быть недоступны для динамических таблиц. Со временем, в будущих патчах и версиях, этот разрыв будет сокращаться. Несмотря на то, что данная материализация специфична для Snowflake, она во многом следует реализации материализованных представлений. В частности, динамические таблицы поддерживают настройку on_configuration_change. Динамические таблицы поддерживаются со следующими параметрами конфигурации:

Подробнее об этих параметрах можно узнать в документации Snowflake:

Target lag

Snowflake поддерживает два сценария конфигурации для планирования автоматических обновлений:

  • По времени — Указывается значение в формате <int> { seconds | minutes | hours | days }. Например, если динамическую таблицу нужно обновлять каждые 30 минут, используйте target_lag='30 minutes'.
  • Downstream — Применимо, когда динамическая таблица используется другими динамическими таблицами. В этом сценарии target_lag='downstream' позволяет управлять обновлениями на целевом уровне, а не на каждом уровне цепочки.

Подробнее о target_lag см. в документации Snowflake. Обратите внимание, что Snowflake поддерживает минимальный target lag от 1 минуты.

Ограничения

Как и в случае с материализованными представлениями на большинстве платформ, динамические таблицы имеют ряд ограничений. Среди наиболее важных:

  • SQL для динамических таблиц имеет ограниченный набор возможностей.
  • SQL динамической таблицы нельзя обновить; таблица должна быть пересоздана через --full-refresh (DROP/CREATE).
  • Динамические таблицы не могут зависеть от: materialized views, external tables, streams.
  • Динамические таблицы не могут ссылаться на view, которые находятся ниже по цепочке от другой динамической таблицы.

Дополнительную информацию об ограничениях динамических таблиц см. в документации Snowflake.

Ограничения со стороны dbt: следующие возможности dbt не поддерживаются:

Устранение неполадок с динамическими таблицами

Если модель динамической таблицы не удаётся повторно выполнить после первого запуска и появляется следующая ошибка:

SnowflakeDynamicTableConfig.__init__() missing 6 required positional arguments: 'name', 'schema_name', 'database_name', 'query', 'target_lag', and 'snowflake_warehouse'

Убедитесь, что параметр QUOTED_IDENTIFIERS_IGNORE_CASE в вашем аккаунте установлен в FALSE.

Временные таблицы

При инкрементальных merge-операциях в Snowflake dbt предпочитает использовать view, а не temporary table. Это позволяет избежать дополнительной записи в базу данных, которую инициирует временная таблица, и сократить время компиляции.

Тем не менее, существуют ситуации, когда временная таблица может быть быстрее или безопаснее. Конфигурация tmp_relation_type позволяет явно выбрать использование временных таблиц для инкрементальных сборок. Она задаётся как часть конфигурации модели.

Для гарантии корректности инкрементная модель со стратегией delete+insert и определённым unique_key требует использования временной таблицы; попытка заменить её на view приведёт к ошибке.

Определение в project YAML:

dbt_project.yml
name: my_project

...

models:
<resource-path>:
+tmp_relation_type: table | view ## If not defined, view is the default.

В формате конфигурации SQL-файла модели:

dbt_model.sql
{{ config(
tmp_relation_type="table | view", ## If not defined, view is the default.
) }}

Transient таблицы

Snowflake поддерживает создание transient tables. Snowflake не хранит историю для таких таблиц, что может существенно снизить стоимость хранения данных. Transient таблицы участвуют в time travel в ограниченном объёме: по умолчанию период хранения составляет 1 день и отсутствует fail-safe период. При принятии решения о конфигурации моделей dbt как transient необходимо учитывать эти компромиссы. По умолчанию все таблицы Snowflake, создаваемые dbt, являются transient.

Настройка transient таблиц в dbt_project.yml

Целую папку (или пакет) можно настроить как transient (или наоборот), добавив соответствующую строку в файл dbt_project.yml. Эта конфигурация работает так же, как и остальные конфигурации моделей, определённые в dbt_project.yml.

dbt_project.yml
name: my_project

...

models:
+transient: false
my_project:
...

Настройка transient для конкретной модели

Конкретную модель можно настроить как transient, установив конфигурацию transient в true.

my_table.sql
{{ config(materialized='table', transient=true) }}

select * from ...

Теги запросов (Query tags)

Query tags — это параметр Snowflake, который может быть полезен при последующем поиске в представлении QUERY_HISTORY.

dbt поддерживает задание значения query tag по умолчанию на время всех подключений к Snowflake в профиле. Более точные значения (а также переопределение значения по умолчанию) можно задать для подмножеств моделей с помощью конфигурации модели query_tag или переопределив макрос set_query_tag:

dbt_project.yml
models:
<resource-path>:
+query_tag: dbt_special
models/<modelname>.sql
{{ config(
query_tag = 'dbt_special'
) }}

select ...

В этом примере можно настроить query tag, который будет применяться к каждому запросу с именем модели.

{% macro set_query_tag() -%}
{% set new_query_tag = model.name %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}

Примечание: query tags задаются на уровне сессии. В начале каждой materialization, если у модели настроен собственный query_tag, dbt выполняет alter session set query_tag, чтобы установить новое значение. В конце материализации dbt выполняет ещё один alter, чтобы вернуть значение по умолчанию. Поэтому сбои сборки в середине материализации могут привести к тому, что последующие запросы будут выполняться с некорректным тегом.

Поведение merge (инкрементные модели)

Конфигурация incremental_strategy управляет тем, как dbt строит инкрементные модели. По умолчанию dbt использует оператор merge в Snowflake для обновления инкрементных таблиц.

Snowflake поддерживает следующие инкрементные стратегии:

  • merge (по умолчанию)
  • append
  • delete+insert
  • insert_overwrite
    • Примечание: это нестандартная инкрементная стратегия dbt. insert_overwrite ведёт себя как команды truncate + повторный insert в Snowflake. Она не поддерживает перезапись по партициям и намеренно перезаписывает всю таблицу. Реализована как инкрементная стратегия, так как соответствует рабочему процессу dbt без удаления существующих таблиц.
  • microbatch

Оператор merge в Snowflake завершается ошибкой «nondeterministic merge», если unique_key, указанный в конфигурации модели, на самом деле не является уникальным. В этом случае можно указать dbt использовать двухшаговый инкрементный подход, установив для модели incremental_strategy в delete+insert.

Настройка кластеризации таблиц

dbt поддерживает кластеризацию таблиц в Snowflake. Для управления кластеризацией table или инкрементной модели используется конфигурация cluster_by. При её применении dbt выполняет два действия:

  1. Неявно сортирует результаты таблицы по указанным полям cluster_by.
  2. Добавляет указанные ключи кластеризации в целевую таблицу.

Используя поля cluster_by для сортировки таблицы, dbt минимизирует объём работы, необходимый для автоматической кластеризации Snowflake. Если инкрементная модель настроена с кластеризацией, dbt также сортирует промежуточный датасет перед merge в целевую таблицу. Таким образом, таблица, управляемая dbt, практически всегда остаётся в кластеризованном состоянии.

Использование cluster_by

Конфигурация cluster_by принимает строку или список строк, используемых в качестве ключей кластеризации. В следующем примере создаётся таблица sessions, кластеризованная по колонке session_start.

models/events/sessions.sql
{{
config(
materialized='table',
cluster_by=['session_start']
)
}}

select
session_id,
min(event_time) as session_start,
max(event_time) as session_end,
count(*) as count_pageviews

from {{ source('snowplow', 'event') }}
group by 1

Этот код будет скомпилирован в SQL, который выглядит примерно так:

create or replace table my_database.my_schema.my_table as (

select * from (
select
session_id,
min(event_time) as session_start,
max(event_time) as session_end,
count(*) as count_pageviews

from {{ source('snowplow', 'event') }}
group by 1
)

-- этот order by добавляется dbt для создания
-- таблицы уже в кластеризованном виде
order by session_start

);

alter table my_database.my_schema.my_table cluster by (session_start);

Кластеризация динамических таблиц

Начиная с dbt Core v1.11, динамические таблицы поддерживают конфигурацию cluster_by. При её указании dbt включает параметры кластеризации в оператор CREATE DYNAMIC TABLE.

Например:

{{ config(
materialized='dynamic_table',
snowflake_warehouse='COMPUTE_WH',
target_lag='1 minute',
cluster_by=['session_start', 'user_id']
) }}

select
session_id,
user_id,
min(event_time) as session_start,
max(event_time) as session_end,
count(*) as count_pageviews
from {{ source('snowplow', 'event') }}
group by 1, 2

Эта конфигурация сгенерирует следующий SQL при компиляции:

create or replace dynamic table my_database.my_schema.my_table
target_lag = '1 minute'
warehouse = COMPUTE_WH
cluster by (session_start, user_id)
as (
select
session_id,
user_id,
min(event_time) as session_start,
max(event_time) as session_end,
count(*) as count_pageviews
from source_table
group by 1, 2
);

Кластеризацию для динамических таблиц можно указать при создании с помощью CLUSTER BY в операторе CREATE DYNAMIC TABLE. Выполнять отдельный ALTER TABLE не требуется.

Автоматическая кластеризация

Автоматическая кластеризация включена по умолчанию в Snowflake, и для её использования не требуется никаких дополнительных действий. Хотя существует конфигурация automatic_clustering, она не оказывает эффекта, за исключением аккаунтов с (устаревшей) ручной кластеризацией.

Если в вашем аккаунте всё ещё включена ручная кластеризация, можно использовать конфигурацию automatic_clustering, чтобы управлять автоматической кластеризацией для моделей dbt. Когда automatic_clustering установлена в true, dbt выполнит запрос alter table <table name> resume recluster после сборки целевой таблицы.

Конфигурацию automatic_clustering можно указать в файле dbt_project.yml или в блоке config() модели.

dbt_project.yml
models:
+automatic_clustering: true

Конфигурация Python-моделей

Адаптер Snowflake поддерживает Python-модели. Snowflake использует собственный фреймворк Snowpark, который во многом похож на PySpark.

Дополнительная настройка: для использования пакетов Anaconda необходимо принять условия Snowflake Third Party Terms.

Установка пакетов: Snowpark поддерживает ряд популярных пакетов через Anaconda. См. полный список для подробностей. Пакеты устанавливаются во время выполнения модели. Разные модели могут иметь разные зависимости. При использовании сторонних пакетов Snowflake рекомендует использовать выделенный виртуальный warehouse для лучшей производительности.

Версия Python: чтобы указать другую версию Python, используйте следующую конфигурацию:

def model(dbt, session):
dbt.config(
materialized = "table",
python_version="3.11"
)

Конфигурация python_version позволяет запускать Snowpark-модели с версиями Python 3.9, 3.10 или 3.11.

External access integrations и secrets: для запросов к внешним API внутри Python-моделей dbt используйте Snowflake external access вместе с secrets. Ниже приведён пример дополнительных конфигураций:

import pandas
import snowflake.snowpark as snowpark

def model(dbt, session: snowpark.Session):
dbt.config(
materialized="table",
secrets={"secret_variable_name": "test_secret"},
external_access_integrations=["test_external_access_integration"],
)
import _snowflake
return session.create_dataframe(
pandas.DataFrame(
[{"secret_value": _snowflake.get_generic_secret_string('secret_variable_name')}]
)
)

Документация: "Developer Guide: Snowpark Python"

Сторонние пакеты Snowflake

Чтобы использовать сторонний пакет Snowflake, отсутствующий в Snowflake Anaconda, загрузите его, следуя этому примеру, а затем настройте параметр imports в Python-модели dbt, указав zip-файл в Snowflake staging.

Ниже приведён полный пример конфигурации с использованием zip-файла и параметра imports:

def model(dbt, session):
# Configure the model
dbt.config(
materialized="table",
imports=["@mystage/mycustompackage.zip"], # Specify the external package location
)

# Example data transformation using the imported package
# (Assuming `some_external_package` has a function we can call)
data = {
"name": ["Alice", "Bob", "Charlie"],
"score": [85, 90, 88]
}
df = pd.DataFrame(data)

# Process data with the external package
df["adjusted_score"] = df["score"].apply(lambda x: some_external_package.adjust_score(x))

# Return the DataFrame as the model output
return df

Подробнее см. в документации Snowflake о загрузке и использовании сторонних Python-пакетов в Snowpark, не опубликованных в канале Anaconda.

Настройка виртуальных warehouse

Виртуальный warehouse по умолчанию, который использует dbt, можно настроить в профиле для подключений Snowflake. Чтобы переопределить warehouse для отдельных моделей (или групп моделей), используйте конфигурацию snowflake_warehouse. Это позволяет назначать более крупный warehouse для определённых моделей, чтобы управлять стоимостью Snowflake и временем сборки проекта.

Ниже приведён пример изменения warehouse для группы моделей с помощью аргумента конфигурации в YAML.

dbt_project.yml
name: my_project
version: 1.0.0

...

models:
+snowflake_warehouse: "EXTRA_SMALL" # default Snowflake virtual warehouse for all models in the project.
my_project:
clickstream:
+snowflake_warehouse: "EXTRA_LARGE" # override the default Snowflake virtual warehouse for all models under the `clickstream` directory.
snapshots:
+snowflake_warehouse: "EXTRA_LARGE" # all Snapshot models are configured to use the `EXTRA_LARGE` warehouse.

Копирование прав (Copying grants)

Когда конфигурация copy_grants установлена в true, dbt добавляет квалификатор copy grants к DDL при пересборке таблиц и представлений. Значение по умолчанию — false.

dbt_project.yml
models:
+copy_grants: true

Secure views

Чтобы создать secure view в Snowflake, используйте конфигурацию secure для моделей с материализацией view. Secure views применяются для ограничения доступа к чувствительным данным. Примечание: secure views могут снижать производительность, поэтому используйте их только при необходимости.

Следующий пример настраивает модели в папке sensitive/ как secure views.

dbt_project.yml
name: my_project
version: 1.0.0

models:
my_project:
sensitive:
+materialized: view
+secure: true

Известное ограничение source freshness

Snowflake рассчитывает source freshness на основе значения колонки LAST_ALTERED, то есть использует поле, обновляемое при любых изменениях объекта, а не только при обновлении данных. Никаких действий предпринимать не нужно, однако аналитическим командам следует учитывать этот нюанс.

Согласно документации Snowflake:

Колонка LAST_ALTERED обновляется при выполнении следующих операций над объектом:

  • DDL-операции.
  • DML-операции (только для таблиц).
  • Фоновые операции обслуживания метаданных, выполняемые Snowflake.

Нашли ошибку?

0
Loading