Перейти к основному содержимому

freshness

models/<filename>.yml

version: 2

sources:
- name: <source_name>
freshness:
warn_after:
count: <positive_integer>
period: minute | hour | day
error_after:
count: <positive_integer>
period: minute | hour | day
filter: <boolean_sql_expression>
loaded_at_field: <column_name_or_expression>

tables:
- name: <table_name>
freshness:
warn_after:
count: <positive_integer>
period: minute | hour | day
error_after:
count: <positive_integer>
period: minute | hour | day
filter: <boolean_sql_expression>
loaded_at_field: <column_name_or_expression>
...

Определение

Блок свежести используется для определения допустимого времени между самой последней записью и текущим моментом, чтобы считалась "свежей".

В блоке freshness можно указать один или оба параметра warn_after и error_after. Если ни один из них не указан, dbt не будет рассчитывать снимки свежести для таблиц в этом источнике.

В большинстве случаев требуется указать loaded_at_field. Некоторые адаптеры поддерживают расчет свежести источника на основе метаданных таблиц хранилища и могут исключить loaded_at_field.

Если у источника есть блок freshness:, dbt попытается рассчитать свежесть для этого источника:

  • Если указан loaded_at_field, dbt будет рассчитывать свежесть с помощью запроса select (поведение до версии v1.7).
  • Если loaded_at_field не указан, dbt будет рассчитывать свежесть с помощью метаданных таблиц хранилища, когда это возможно (новое в v1.7 для поддерживаемых адаптеров).

В настоящее время расчет свежести на основе метаданных таблиц хранилища поддерживается для следующих адаптеров:

Поддержка скоро появится для адаптера Spark.

Блоки свежести применяются иерархически:

  • свойства freshness и loaded_at_field, добавленные к источнику, будут применены ко всем таблицам, определенным в этом источнике.
  • свойства freshness и loaded_at_field, добавленные к таблице источника, переопределят любые свойства, примененные к источнику.

Это полезно, когда все таблицы в источнике имеют одно и то же значение loaded_at_field, что часто бывает.

Чтобы исключить источник из расчетов свежести, у вас есть два варианта:

  • Не добавлять блок freshness:.
  • Явно установить freshness: null.

loaded_at_field

Необязателен для адаптеров, которые поддерживают получение свежести из метаданных таблиц хранилища, в противном случае требуется.

Имя столбца (или выражение), которое возвращает временную метку, указывающую на свежесть.

Если используется поле даты, возможно, вам придется привести его к типу timestamp:

loaded_at_field: "completed_date::timestamp"

Или, в зависимости от вашего варианта SQL:

loaded_at_field: "CAST(completed_date AS TIMESTAMP)"

Если используется временная метка, не относящаяся к UTC, сначала приведите ее к UTC:

loaded_at_field: "convert_timezone('Australia/Sydney', 'UTC', created_at_local)"

count

(Обязательный)

Положительное целое число, указывающее количество периодов, в течение которых источник данных все еще считается "свежим".

period

(Обязательный)

Период времени, используемый в расчете свежести. Один из minute, hour или day.

filter

(необязательный)

Добавьте условие where к запросу, выполняемому dbt source freshness, чтобы ограничить объем обрабатываемых данных.

Этот фильтр применяется только к запросам свежести источника dbt - он не повлияет на другие использования таблицы источника.

Это особенно полезно, если:

  • Вы используете BigQuery, и ваши таблицы источников являются разделенными таблицами.
  • Вы используете Snowflake, Databricks или Spark с большими таблицами, и это приводит к улучшению производительности.

Примеры

Полный пример

models/<filename>.yml

version: 2

sources:
- name: jaffle_shop
database: raw

freshness: # стандартная свежесть
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}

loaded_at_field: _etl_loaded_at

tables:
- name: customers # это будет использовать свежесть, определенную выше

- name: orders
freshness: # сделаем это немного более строгим
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
# Примените условие where в запросе свежести
filter: datediff('day', _etl_loaded_at, current_timestamp) < 2

- name: product_skus
freshness: # не проверять свежесть для этой таблицы

При выполнении dbt source freshness будет выполнен следующий запрос:

select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders

where datediff('day', _etl_loaded_at, current_timestamp) < 2

0