Перейти к основному содержимому

Конфигурации ресурсов DeltaStream

Поддерживаемые материализации

DeltaStream поддерживает несколько специализированных типов материализации, которые соответствуют его возможностям потоковой обработки:

Стандартные материализации

МатериализацияОписание
ephemeralЭта материализация использует common table expressions (CTE) в DeltaStream «под капотом».
tableКлассическая пакетная материализация в виде таблицы
materialized_viewНепрерывно обновляемое представление, которое автоматически обновляется при изменении исходных данных
Loading table...

Потоковые материализации

МатериализацияОписание
streamЧисто потоковая трансформация, обрабатывающая данные в реальном времени
changelogПоток change data capture (CDC), отслеживающий изменения в данных
Loading table...

Инфраструктурные материализации

МатериализацияОписание
storeПодключение к внешней системе (Kafka, PostgreSQL и т.д.)
entityОпределение сущности внутри хранилища
databaseОпределение базы данных
compute_poolОпределение пула вычислительных ресурсов
functionПользовательские функции (UDF) на Java
function_sourceИсточники JAR-файлов для UDF
descriptor_sourceИсточники схем protocol buffer
schema_registryПодключения к schema registry (Confluent и т.п.)
Loading table...

Конфигурации SQL-моделей

Материализация table

Создаёт классическую пакетную таблицу для агрегированных данных:

Конфигурация в project YAML-файле:

models:
<resource-path>:
+materialized: table

SQL-конфигурация:

{{ config(materialized = "table") }}

SELECT
date,
SUM(amount) as daily_total
FROM {{ ref('transactions') }}
GROUP BY date

Материализация stream

Создаёт непрерывную потоковую трансформацию:

Конфигурация в project YAML-файле:

models:
<resource-path>:
+materialized: stream
+parameters:
topic: 'stream_topic'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'

SQL-конфигурация:

{{ config(
materialized='stream',
parameters={
'topic': 'purchase_events',
'value.format': 'json',
'key.format': 'primitive',
'key.type': 'VARCHAR',
'timestamp': 'event_time'
}
) }}

SELECT
event_time,
user_id,
action
FROM {{ ref('source_stream') }}
WHERE action = 'purchase'

Параметры конфигурации stream

ПараметрОписаниеОбязательный
materializedСпособ материализации модели. Для потоковой модели должен быть stream.Да
topicИмя топика для выходного потока.Да
value.formatФормат значений потока (например, json, avro).Да
key.formatФормат ключей потока (например, primitive, json).Нет
key.typeТип данных ключей потока (например, VARCHAR, BIGINT).Нет
timestampИмя колонки, используемой как временная метка события.Нет
Loading table...

Материализация changelog

Фиксирует изменения в потоке данных:

Конфигурация в project YAML-файле:

models:
<resource-path>:
+materialized: changelog
+parameters:
topic: 'changelog_topic'
value.format: 'json'
+primary_key: [column_name]

SQL-конфигурация:

{{ config(
materialized='changelog',
parameters={
'topic': 'order_updates',
'value.format': 'json'
},
primary_key=['order_id']
) }}

SELECT
order_id,
status,
updated_at
FROM {{ ref('orders_stream') }}

Параметры конфигурации changelog

ПараметрОписаниеОбязательный
materializedСпособ материализации модели. Для changelog должен быть changelog.Да
topicИмя топика для выходного changelog-потока.Да
value.formatФормат значений changelog (например, json, avro).Да
primary_keyСписок колонок, однозначно идентифицирующих строки для отслеживания изменений.Да
Loading table...

Материализованное представление

Создаёт непрерывно обновляемое представление:

SQL-конфигурация:

{{ config(materialized='materialized_view') }}

SELECT
product_id,
COUNT(*) as purchase_count
FROM {{ ref('purchase_events') }}
GROUP BY product_id

Ресурсы, определяемые только в YAML

DeltaStream поддерживает два типа определений моделей для инфраструктурных компонентов:

  1. Управляемые ресурсы (Models) — автоматически включаются в dbt DAG
  2. Неуправляемые ресурсы (Sources) — создаются по требованию с помощью специальных макросов

Когда использовать управляемые или неуправляемые ресурсы?

  • Используйте управляемые ресурсы, если вы планируете пересоздавать всю инфраструктуру в разных окружениях и/или применять операторы графа для выполнения только создания конкретных ресурсов и зависимых трансформаций.
  • В противном случае может быть проще использовать неуправляемые ресурсы, чтобы избежать файлов-заглушек.

Управляемые ресурсы (models)

Управляемые ресурсы автоматически включаются в dbt DAG и описываются как модели:

version: 2
models:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"

- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"

- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'

- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'

- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3

- name: my_compute_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5

- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'

- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'

- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'

- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT"
access_region: "AWS us-east-1"
uris: "https://url.to.schema.registry.listener:8081"
'confluent.username': 'fake_username'
'confluent.password': 'fake_password'
'tls.client.cert_file': '@/path/to/tls/client_cert_file'
'tls.client.key_file': '@/path/to/tls_key'

Примечание: Из‑за текущих ограничений dbt управляемые YAML-only ресурсы требуют наличия .sql файла-заглушки без оператора SELECT. Например, создайте my_kafka_store.sql со следующим содержимым:

-- Placeholder

Неуправляемые ресурсы (sources)

Неуправляемые ресурсы определяются как sources и создаются по требованию с помощью специальных макросов:

version: 2
sources:
- name: infrastructure
tables:
- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"

- name: ps_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"

- name: user_events_stream
config:
materialized: stream
columns:
event_time:
type: TIMESTAMP
not_null: true
user_id:
type: VARCHAR
action:
type: VARCHAR
parameters:
topic: 'user_events'
value.format: 'json'
key.format: 'primitive'
key.type: 'VARCHAR'
timestamp: 'event_time'

- name: order_changes
config:
materialized: changelog
columns:
order_id:
type: VARCHAR
not_null: true
status:
type: VARCHAR
updated_at:
type: TIMESTAMP
primary_key:
- order_id
parameters:
topic: 'order_updates'
value.format: 'json'

- name: pv_kinesis
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3

- name: compute_pool_small
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5

- name: my_function_source
config:
materialized: function_source
parameters:
file: '@/path/to/my-functions.jar'
description: 'Custom utility functions'

- name: my_descriptor_source
config:
materialized: descriptor_source
parameters:
file: '@/path/to/schemas.desc'
description: 'Protocol buffer schemas for data structures'

- name: my_custom_function
config:
materialized: function
parameters:
args:
- name: input_text
type: VARCHAR
returns: VARCHAR
language: JAVA
source.name: 'my_function_source'
class.name: 'com.example.TextProcessor'

- name: my_schema_registry
config:
materialized: schema_registry
parameters:
type: "CONFLUENT"
access_region: "AWS us-east-1"
uris: "https://url.to.schema.registry.listener:8081"
'confluent.username': 'fake_username'
'confluent.password': 'fake_password'
'tls.client.cert_file': '@/path/to/tls/client_cert_file'
'tls.client.key_file': '@/path/to/tls_key'

Для создания неуправляемых ресурсов:

# Создать все источники
dbt run-operation create_sources

# Создать конкретный источник
dbt run-operation create_source_by_name --args '{source_name: infrastructure}'

Конфигурации Store

Kafka store

- name: my_kafka_store
config:
materialized: store
parameters:
type: KAFKA
access_region: "AWS us-east-1"
uris: "kafka.broker1.url:9092,kafka.broker2.url:9092"
tls.ca_cert_file: "@/certs/us-east-1/self-signed-kafka-ca.crt"

PostgreSQL store

- name: postgres_store
config:
materialized: store
parameters:
type: POSTGRESQL
access_region: "AWS us-east-1"
uris: "postgresql://mystore.com:5432/demo"
postgres.username: "user"
postgres.password: "password"

Конфигурация Entity

- name: kinesis_entity
config:
materialized: entity
store: kinesis_store
parameters:
'kinesis.shards': 3

Конфигурация Compute pool

- name: processing_pool
config:
materialized: compute_pool
parameters:
'compute_pool.size': 'small'
'compute_pool.timeout_min': 5

Ссылки на ресурсы

Управляемые ресурсы

Используйте стандартную функцию ref():

select * from {{ ref('my_kafka_stream') }}

Неуправляемые ресурсы

Используйте функцию source():

SELECT * FROM {{ source('infrastructure', 'user_events_stream') }}

Сиды (Seeds)

Загружайте CSV-данные в существующие сущности DeltaStream с помощью материализации seed. В отличие от классических dbt seeds, которые создают новые таблицы, seeds в DeltaStream вставляют данные в уже существующие сущности.

Конфигурация

Seeds настраиваются в YAML и поддерживают следующие параметры:

Обязательные:

  • entity — имя целевой сущности, в которую будут вставляться данные

Необязательные:

  • store — имя store, содержащего сущность (можно опустить, если сущность не находится в store)
  • with_params — словарь параметров для секции WITH
  • quote_columns — управление экранированием имён колонок. Значение по умолчанию: false. Возможные варианты:
    • true — экранировать все колонки
    • false — не экранировать колонки (по умолчанию)
    • string — если указано '*', экранировать все колонки
    • list — список имён колонок для экранирования

Пример конфигурации

Со Store (экранирование включено):

# seeds.yml
version: 2

seeds:
- name: user_data_with_store_quoted
config:
entity: 'user_events'
store: 'kafka_store'
with_params:
kafka.topic.retention.ms: '86400000'
partitioned: true
quote_columns: true

Использование

  1. Поместите CSV-файлы в каталог seeds/
  2. Настройте seeds в YAML, указав обязательный параметр entity
  3. При необходимости укажите store
  4. Выполните dbt seed для загрузки данных
Важно

Целевая сущность должна уже существовать в DeltaStream до запуска seeds. Seeds только вставляют данные и не создают сущности.

Материализации функций и источников

DeltaStream поддерживает пользовательские функции (UDF) и их зависимости через специализированные материализации.

Поддержка прикрепления файлов

Адаптер предоставляет единый механизм работы с файлами для function source и descriptor source:

  • Стандартизированный интерфейс — общая логика работы с файлами
  • Разрешение путей — поддержка абсолютных и относительных путей (включая синтаксис @)
  • Автоматическая валидация — проверка существования и доступности файлов перед прикреплением

Источник функций (function_source)

Создаёт источник функций из JAR-файла с Java-функциями:

SQL-конфигурация:

{{ config(
materialized='function_source',
parameters={
'file': '@/path/to/my-functions.jar',
'description': 'Custom utility functions'
}
) }}

SELECT 1 as placeholder

Источник дескрипторов (descriptor_source)

Создаёт источник дескрипторов из скомпилированных файлов protocol buffer:

SQL-конфигурация:

{{ config(
materialized='descriptor_source',
parameters={
'file': '@/path/to/schemas.desc',
'description': 'Protocol buffer schemas for data structures'
}
) }}

SELECT 1 as placeholder
Примечание

Для descriptor source требуются скомпилированные .desc файлы, а не исходные .proto. Скомпилируйте схемы protobuf следующим образом:

protoc --descriptor_set_out=schemas/my_schemas.desc schemas/my_schemas.proto

Функция (function)

Создаёт пользовательскую функцию, ссылающуюся на function source:

SQL-конфигурация:

{{ config(
materialized='function',
parameters={
'args': [
{'name': 'input_text', 'type': 'VARCHAR'}
],
'returns': 'VARCHAR',
'language': 'JAVA',
'source.name': 'my_function_source',
'class.name': 'com.example.TextProcessor'
}
) }}

SELECT 1 as placeholder

Материализация Schema registry

Создаёт подключение к schema registry:

SQL-конфигурация:

{{ config(
materialized='schema_registry',
parameters={
'type': 'CONFLUENT',
'access_region': 'AWS us-east-1',
'uris': 'https://url.to.schema.registry.listener:8081',
'confluent.username': 'fake_username',
'confluent.password': 'fake_password',
'tls.client.cert_file': '@/path/to/tls/client_cert_file',
'tls.client.key_file': '@/path/to/tls_key'
}
) }}

SELECT 1 as placeholder

Макросы управления запросами

Адаптер DeltaStream для dbt предоставляет макросы для просмотра и управления выполняемыми запросами.

Список всех запросов

Макрос list_all_queries выводит все запросы, известные DeltaStream, включая их состояние, владельца и SQL:

dbt run-operation list_all_queries

Описание запроса

Макрос describe_query позволяет посмотреть логи и детали конкретного запроса:

dbt run-operation describe_query --args '{query_id: "<QUERY_ID>"}'

Остановка конкретного запроса

Макрос terminate_query завершает запрос по его ID:

dbt run-operation terminate_query --args '{query_id: "<QUERY_ID>"}'

Остановка всех выполняющихся запросов

Макрос terminate_all_queries завершает все текущие запросы:

dbt run-operation terminate_all_queries

Перезапуск запроса

Макрос restart_query позволяет перезапустить упавший запрос по его ID:

dbt run-operation restart_query --args '{query_id: "<QUERY_ID>"}'

Макрос application

Выполнение нескольких операторов как одной операции

Макрос application позволяет выполнить несколько SQL-операторов DeltaStream как единую транзакционную единицу с семантикой «всё или ничего»:

dbt run-operation application --args '{
application_name: "my_data_pipeline",
statements: [
"USE DATABASE my_db",
"CREATE STREAM user_events WITH (topic='"'"'events'"'"', value.format='"'"'json'"'"')",
"CREATE MATERIALIZED VIEW user_counts AS SELECT user_id, COUNT(*) FROM user_events GROUP BY user_id"
]
}'

Устранение неполадок

Готовность function source

Если при создании функций возникает ошибка «function source is not ready»:

  1. Автоматические повторы — адаптер автоматически повторяет попытки с экспоненциальной задержкой
  2. Настройка таймаута — стандартный таймаут 30 секунд можно увеличить для больших JAR-файлов
  3. Порядок зависимостей — убедитесь, что function source создаётся до зависящих от него функций
  4. Ручной повтор — если автоматические попытки не помогли, подождите несколько минут и повторите операцию

Проблемы с прикреплением файлов

При проблемах с прикреплением файлов в function source и descriptor source:

  1. Пути к файлам — используйте синтаксис @/path/to/file для путей относительно проекта

  2. Типы файлов:

    • function source требует .jar файлы
    • descriptor source требует скомпилированные .desc файлы (не .proto)
  3. Валидация файлов — адаптер проверяет существование файлов перед прикреплением

  4. Компиляция — для descriptor source убедитесь, что protobuf-файлы скомпилированы:

    protoc --descriptor_set_out=output.desc input.proto

Нашли ошибку?

0
Loading