Конфигурации ресурсов DeltaStream
Поддерживаемые материализации
DeltaStream поддерживает несколько специализированных типов материализации, которые соответствуют его возможностям потоковой обработки:
Стандартные материализации
| Loading table... |
Потоковые материализации
| Loading table... |
Инфраструктурные материализации
| Loading table... |
Конфигурации SQL-моделей
Материализация table
Создаёт классическую пакетную таблицу для агрегированных данных:
Конфигурация в project YAML-файле:
models:
<resource-path>:
+materialized: table
SQL-конфигурация:
{{ config(materialized = "table") }}
SELECT
date,
SUM(amount) as daily_total
FROM {{ ref('transactions') }}
GROUP BY date
Материализация stream
Создаёт непрерывную потоковую трансформацию:
Конфигурация в project YAML-файле:
models:
<resource-path>:
+materialized: stream
+parameters:
topic: 'stream_topic'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
SQL-конфигурация:
{{ config(
materialized='stream',
parameters={
'topic': 'purchase_events',
'value.format': 'json',
'key.format': 'primitive',
'key.type': 'VARCHAR',
'timestamp': 'event_time'
}
) }}
SELECT
event_time,
user_id,
action
FROM {{ ref('source_stream') }}
WHERE action = 'purchase'
Параметры конфигурации stream
| Loading table... |
Материализация changelog
Фиксирует изменения в потоке данных:
Конфигурация в project YAML-файле:
models:
<resource-path>:
+materialized: changelog
+parameters:
topic: 'changelog_topic'
value.format: 'json'
+primary_key: [column_name]
SQL-конфигурация:
{{ config(
materialized='changelog',
parameters={
'topic': 'order_updates',
'value.format': 'json'
},
primary_key=['order_id']
) }}
SELECT
order_id,
status,
updated_at
FROM {{ ref('orders_stream') }}
Параметры конфигурации changelog
| Loading table... |
Материализованное представление
Создаёт непрерывно обновляемое представление:
SQL-конфигурация:
{{ config(materialized='materialized_view') }}
SELECT
product_id,
COUNT(*) as purchase_count
FROM {{ ref('purchase_events') }}
GROUP BY product_id
Ресурсы, определяемые только в YAML
DeltaStream поддерживает два типа определений моделей для инфраструктурных компонентов:
- Управляемые ресурсы (Models) — автоматически включаются в dbt DAG
- Неуправляемые ресурсы (Sources) — создаются по требованию с помощью специальных макросов
Когда использовать управляемые или неуправляемые ресурсы?
- Используйте управляемые ресурсы, если вы планируете пересоздавать всю инфраструктуру в разных окружениях и/или применять операторы графа для выполнения только создания конкретных ресурсов и зависимых трансформаций.
- В противном случае может быть проще использовать неуправляемые ресурсы, чтобы избежать файлов-заглушек.
Управляемые ресурсы (models)
Управляемые ресурсы автоматически включаются в dbt DAG и описываются как модели:
version: 2
models:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3
- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5
- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'
- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'
- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'
- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT"
access_region: "AWS us-east-1"
uris: "https://url.to.schema.registry.listener:8081"
'confluent.username': 'fake_username'
'confluent.password': 'fake_password'
'tls.client.cert_file': '@/path/to/tls/client_cert_file'
'tls.client.key_file': '@/path/to/tls_key'
Примечание: Из‑за текущих ограничений dbt управляемые YAML-only ресурсы требуют наличия .sql файла-заглушки без оператора SELECT. Например, создайте my_kafka_store.sql со следующим содержимым:
-- Placeholder
Неуправляемые ресурсы (sources)
Неуправляемые ресурсы определяются как sources и создаются по требованию с помощью специальных макросов:
version: 2
sources:
- name: infrastructure
tables:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'
- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'
- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3
- name: compute_pool_small
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5
- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'
- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'
- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'
- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT"
access_region: "AWS us-east-1"
uris: "https://url.to.schema.registry.listener:8081"
'confluent.username': 'fake_username'
'confluent.password': 'fake_password'
'tls.client.cert_file': '@/path/to/tls/client_cert_file'
'tls.client.key_file': '@/path/to/tls_key'
Для создания неуправляемых ресурсов:
# Создать все источники
dbt run-operation create_sources
# Создать конкретный источник
dbt run-operation create_source_by_name --args '{source_name: infrastructure}'
Конфигурации Store
Kafka store
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"
PostgreSQL store
- name: postgres_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"
Конфигурация Entity
- name: kinesis_entity
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3
Конфигурация Compute pool
- name: processing_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5
Ссылки на ресурсы
Управляемые ресурсы
Используйте стандартную функцию ref():
select * from {{ ref('my_kafka_stream') }}
Неуправляемые ресурсы
Используйте функцию source():
SELECT * FROM {{ source('infrastructure', 'user_events_stream') }}
Сиды (Seeds)
Загружайте CSV-данные в существующие сущности DeltaStream с помощью материализации seed. В отличие от классических dbt seeds, которые создают новые таблицы, seeds в DeltaStream вставляют данные в уже существующие сущности.
Конфигурация
Seeds настраиваются в YAML и поддерживают следующие параметры:
Обязательные:
entity— имя целевой сущности, в которую будут вставляться данные
Необязательные:
store— имя store, содержащего сущность (можно опустить, если сущность не находится в store)with_params— словарь параметров для секции WITHquote_columns— управление экранированием имён колонок. Значение по умолчанию:false. Возможные варианты:true— экранировать все колонкиfalse— не экранировать колонки (по умолчанию)string— если указано'*', экранировать все колонкиlist— список имён колонок для экранирования
Пример конфигурации
Со Store (экранирование включено):
# seeds.yml
version: 2
seeds:
- name: user_data_with_store_quoted
config:
entity: 'user_events'
store: 'kafka_store'
with_params:
kafka.topic.retention.ms: '86400000'
partitioned: true
quote_columns: true
Использование
- Поместите CSV-файлы в каталог
seeds/ - Настройте seeds в YAML, указав обязательный параметр
entity - При необходимости укажите
store - Выполните
dbt seedдля загрузки данных
Целевая сущность должна уже существовать в DeltaStream до запуска seeds. Seeds только вставляют данные и не создают сущности.
Материализации функций и источников
DeltaStream поддерживает пользовательские функции (UDF) и их зависимости через специализированные материализации.
Поддержка прикрепления файлов
Адаптер предоставляет единый механизм работы с файлами для function source и descriptor source:
- Стандартизированный интерфейс — общая логика работы с файлами
- Разрешение путей — поддержка абсолютных и относительных путей (включая синтаксис
@) - Автоматическая валидация — проверка существования и доступности файлов перед прикреплением
Источник функций (function_source)
Создаёт источник функций из JAR-файла с Java-функциями:
SQL-конфигурация:
{{ config(
materialized='function_source',
parameters={
'file': '@/path/to/my-functions.jar',
'description': 'Custom utility functions'
}
) }}
SELECT 1 as placeholder
Источник дескрипторов (descriptor_source)
Создаёт источник дескрипторов из скомпилированных файлов protocol buffer:
SQL-конфигурация:
{{ config(
materialized='descriptor_source',
parameters={
'file': '@/path/to/schemas.desc',
'description': 'Protocol buffer schemas for data structures'
}
) }}
SELECT 1 as placeholder
Для descriptor source требуются скомпилированные .desc файлы, а не исходные .proto. Скомпилируйте схемы protobuf следующим образом:
protoc --descriptor_set_out=schemas/my_schemas.desc schemas/my_schemas.proto
Функция (function)
Создаёт пользовательскую функцию, ссылающуюся на function source:
SQL-конфигурация:
{{ config(
materialized='function',
parameters={
'args': [
{'name': 'input_text', 'type': 'VARCHAR'}
],
'returns': 'VARCHAR',
'language': 'JAVA',
'source.name': 'my_function_source',
'class.name': 'com.example.TextProcessor'
}
) }}
SELECT 1 as placeholder
Материализация Schema registry
Создаёт подключение к schema registry:
SQL-конфигурация:
{{ config(
materialized='schema_registry',
parameters={
'type': 'CONFLUENT',
'access_region': 'AWS us-east-1',
'uris': 'https://url.to.schema.registry.listener:8081',
'confluent.username': 'fake_username',
'confluent.password': 'fake_password',
'tls.client.cert_file': '@/path/to/tls/client_cert_file',
'tls.client.key_file': '@/path/to/tls_key'
}
) }}
SELECT 1 as placeholder
Макросы управления запросами
Адаптер DeltaStream для dbt предоставляет макросы для просмотра и управления выполняемыми запросами.
Список всех запросов
Макрос list_all_queries выводит все запросы, известные DeltaStream, включая их состояние, владельца и SQL:
dbt run-operation list_all_queries
Описание запроса
Макрос describe_query позволяет посмотреть логи и детали конкретного запроса:
dbt run-operation describe_query --args '{query_id: "<QUERY_ID>"}'
Остановка конкретного запроса
Макрос terminate_query завершает запрос по его ID:
dbt run-operation terminate_query --args '{query_id: "<QUERY_ID>"}'
Остановка всех выполняющихся запросов
Макрос terminate_all_queries завершает все текущие запросы:
dbt run-operation terminate_all_queries
Перезапуск запроса
Макрос restart_query позволяет перезапустить упавший запрос по его ID:
dbt run-operation restart_query --args '{query_id: "<QUERY_ID>"}'
Макрос application
Выполнение нескольких операторов как одной операции
Макрос application позволяет выполнить несколько SQL-операторов DeltaStream как единую транзакционную единицу с семантикой «всё или ничего»:
dbt run-operation application --args '{
application_name: "my_data_pipeline",
statements: [
"USE DATABASE my_db",
"CREATE STREAM user_events WITH (topic='"'"'events'"'"', value.format='"'"'json'"'"')",
"CREATE MATERIALIZED VIEW user_counts AS SELECT user_id, COUNT(*) FROM user_events GROUP BY user_id"
]
}'
Устранение неполадок
Готовность function source
Если при создании функций возникает ошибка «function source is not ready»:
- Автоматические повторы — адаптер автоматически повторяет попытки с экспоненциальной задержкой
- Настройка таймаута — стандартный таймаут 30 секунд можно увеличить для больших JAR-файлов
- Порядок зависимостей — убедитесь, что function source создаётся до зависящих от него функций
- Ручной повтор — если автоматические попытки не помогли, подождите несколько минут и повторите операцию
Проблемы с прикреплением файлов
При проблемах с прикреплением файлов в function source и descriptor source:
-
Пути к файлам — используйте синтаксис
@/path/to/fileдля путей относительно проекта -
Типы файлов:
- function source требует
.jarфайлы - descriptor source требует скомпилированные
.descфайлы (не.proto)
- function source требует
-
Валидация файлов — адаптер проверяет существование файлов перед прикреплением
-
Компиляция — для descriptor source убедитесь, что protobuf-файлы скомпилированы:
protoc --descriptor_set_out=output.desc input.proto