Перейти к основному содержимому

Добавление источников в ваш DAG

Использование источников

Источники позволяют задавать имена и описывать данные, которые загружаются в ваше хранилище с помощью инструментов Extract и Load. Объявив эти таблицы как источники в dbt, вы получаете возможность:

  • выбирать данные из исходных таблиц в ваших моделях с помощью функции {{ source() }}, что помогает определить происхождение (lineage) ваших данных
  • проверять предположения о ваших исходных данных с помощью тестов
  • рассчитывать актуальность (freshness) ваших исходных данных

Объявление источника

Источники определяются в файлах .yml, вложенных под ключом sources:.

models/<filename>.yml

sources:
- name: jaffle_shop
database: raw
schema: jaffle_shop
tables:
- name: orders
- name: customers

- name: stripe
tables:
- name: payments

*По умолчанию, schema будет таким же, как и name. Добавьте schema только если вы хотите использовать имя источника, отличающееся от существующей схемы.

Если вы еще не знакомы с этими файлами, обязательно ознакомьтесь с документацией по файлам properties.yml перед продолжением.

Выборка из источника

После того как источник был определен, его можно ссылаться из модели, используя функцию {{ source()}}.

models/orders.sql
select
...

from {{ source('jaffle_shop', 'orders') }}

left join {{ source('jaffle_shop', 'customers') }} using (customer_id)

dbt скомпилирует это в полное имя table:

target/compiled/jaffle_shop/models/my_model.sql

select
...

from raw.jaffle_shop.orders

left join raw.jaffle_shop.customers using (customer_id)

Использование функции {{ source () }} также создает зависимость между моделью и исходной таблицей.

Функция source сообщает dbt, что модель зависит от источникаФункция source сообщает dbt, что модель зависит от источника

Тестирование и документирование источников

Вы также можете:

  • Добавлять тесты данных к источникам
  • Добавлять описания к источникам, которые будут отображаться как часть вашего сайта документации

Эти концепции должны быть вам уже знакомы, если вы ранее добавляли тесты данных и описания к своим моделям (если нет — ознакомьтесь с руководствами по тестированию и документации).

models/<filename>.yml

sources:
- name: jaffle_shop
description: Это реплика базы данных Postgres, используемой нашим приложением
tables:
- name: orders
database: raw
description: >
Одна запись на заказ. Включает отмененные и удаленные заказы.
columns:
- name: id
description: Первичный ключ таблицы orders
data_tests:
- unique
- not_null
- name: status
description: Обратите внимание, что статус может изменяться со временем

- name: ...

- name: ...

Вы можете найти больше деталей о доступных свойствах для источников в справочном разделе.

Часто задаваемые вопросы

Что делать, если мой источник находится в плохо названной схеме или таблице?
Что делать, если мой источник находится в другой базе данных, чем целевая база данных?
Мне нужно использовать кавычки для выбора из моего источника, что мне делать?
Как запустить data-тесты только для источников?
Как запустить модели, зависящие от одного источника?

Актуальность данных источника

С помощью нескольких дополнительных конфигураций dbt может опционально фиксировать "актуальность" данных в ваших исходных таблицах. Это полезно для понимания, находятся ли ваши конвейеры данных в здоровом состоянии, и является критическим компонентом определения SLA для вашего хранилища.

Объявление актуальности источника

Чтобы настроить информацию об актуальности источника, добавьте блок freshness к вашему источнику и loaded_at_field к объявлению вашей таблицы:

models/<filename>.yml

sources:
- name: jaffle_shop
database: raw
config:
freshness: # значения freshness по умолчанию
# перенесено в config в версии v1.9
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _etl_loaded_at # перенесено в config в версии v1.10

tables:
- name: orders
config:
freshness: # делаем требования немного строже
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}

- name: customers # это наследует актуальность по умолчанию, определенную в блоке источника jaffle_shop в начале


- name: product_skus
config:
freshness: null # не проверять свежесть для этой таблицы

В блоке freshness можно указать одно или оба из warn_after и error_after. Если ни одно из них не указано, то dbt не будет рассчитывать актуальность для таблиц в этом источнике.

Кроме того, поле loaded_at_field требуется для расчёта свежести таблицы (за исключением случаев, когда dbt может использовать метаданные хранилища для расчёта свежести). Если loaded_at_field или жизнеспособная альтернатива не указаны, dbt не будет рассчитывать свежесть для этой таблицы.

Эти конфигурации применяются иерархически, поэтому значения freshness и loaded_at_field, указанные для source, будут применяться ко всем tables, определенным в этом источнике. Это полезно, когда все таблицы в источнике имеют одинаковый loaded_at_field, так как конфигурацию можно указать один раз в определении источника верхнего уровня.

Проверка актуальности источника

Чтобы получить информацию об актуальности ваших источников, используйте команду dbt source freshness (справочная документация):

$ dbt source freshness

За кулисами dbt использует свойства актуальности для построения запроса select, показанного ниже. Вы можете найти этот запрос в журналах запросов.

select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as calculated_at
from raw.jaffle_shop.orders

Результаты этого запроса используются для определения, является ли источник актуальным или нет:

Ой! Не все так свежо, как хотелось бы!Ой! Не все так свежо, как хотелось бы!

Сборка моделей на основе свежести источников

В качестве лучшей практики мы рекомендуем использовать freshness источников данных. Это позволяет перенести настройки в файл .yml, где свежесть источника определяется на уровне модели.

Чтобы собирать модели на основе свежести источников в dbt:

  1. Запустите dbt source freshness, чтобы проверить свежесть ваших источников.
  2. Используйте команду dbt build --select source_status:fresher+, чтобы собрать и протестировать модели, расположенные ниже по цепочке от более свежих источников.

Использование этих команд в указанном порядке гарантирует, что модели будут обновляться с использованием самых актуальных данных. Это устраняет лишние вычислительные затраты на неизменившиеся данные и обеспечивает сборку моделей только тогда, когда это действительно необходимо.

Установите source freshness snapshots на 30 минут для проверки свежести источников, а затем настройте задание, которое будет запускаться каждый час для пересборки моделей. Такая конфигурация получает все модели и пересобирает их за один запуск, если срок свежести их источников истёк. Подробнее см. Частота snapshot’ов свежести источников.

Фильтр

В некоторых базах данных могут быть таблицы, для которых требуется применять фильтр по определённым колонкам, чтобы избежать полного сканирования таблицы, что может быть дорогостоящим. Чтобы выполнить проверку свежести для таких таблиц, в конфигурацию можно добавить аргумент filter, например: filter: _etl_loaded_at >= date_sub(current_date(), interval 1 day). В приведённом выше примере итоговый запрос будет выглядеть следующим образом

select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as calculated_at
from raw.jaffle_shop.orders
where _etl_loaded_at >= date_sub(current_date(), interval 1 day)

Часто задаваемые вопросы

Как исключить таблицу из снимка актуальности?
Как сделать снимок актуальности только для одного источника?
Сохраняются ли где-нибудь результаты проверки актуальности?

Нашли ошибку?

100
Loading