Перейти к основному содержимому

Python-модели

Обратите внимание, что только определённые платформы данных поддерживают модели dbt-py. Проверьте страницы конфигурации платформ, чтобы убедиться, что Python-модели поддерживаются.

Мы рекомендуем вам:

Обзор

Python модели dbt (dbt-py) могут помочь вам решить задачи, которые невозможно решить с помощью SQL. Вы можете выполнять анализы, используя инструменты, доступные в экосистеме Python с открытым исходным кодом, включая передовые пакеты для науки о данных и статистики. Ранее вам требовалась бы отдельная инфраструктура и оркестрация для выполнения Python трансформаций в производственной среде. Python трансформации, определенные в dbt, являются моделями в вашем проекте с теми же возможностями тестирования, документации и отслеживания.

models/my_python_model.py
import ...

def model(dbt, session):

my_sql_model_df = dbt.ref("my_sql_model")

final_df = ... # то, что нельзя написать на SQL!

return final_df
models/config.yml

models:
- name: my_python_model

# Документируйте в том же коде
description: Моя трансформация, написанная на Python

# Настройте интуитивно и привычно
config:
materialized: table
tags: ['python']

# Тестируйте результаты моей Python трансформации
columns:
- name: id
# Стандартная валидация для «зерна» (grain) результатов Python
data_tests:
- unique
- not_null
data_tests:
# Напишите собственную логику валидации (на SQL) для результатов Python
- custom_generic_test
SQL + Python, вместе наконецSQL + Python, вместе наконец

Предварительные условия для Python моделей dbt включают использование адаптера для платформы данных, поддерживающей полноценное выполнение Python. В Python модели dbt весь Python код выполняется удаленно на платформе. Ничего из этого не выполняется dbt локально. Мы верим в четкое разделение определения модели от выполнения модели. В этом и во многих других аспектах подход dbt к Python моделям отражает его давний подход к моделированию данных в SQL.

Мы написали это руководство, предполагая, что вы знакомы с dbt. Если вы никогда раньше не писали модель dbt, мы рекомендуем начать с чтения dbt Models. На протяжении всего текста мы будем проводить параллели между Python моделями и SQL моделями, а также четко указывать их различия.

Что такое Python модель?

Python модель dbt — это функция, которая читает источники dbt или другие модели, применяет серию трансформаций и возвращает преобразованный набор данных. Операции DataFrame определяют начальные точки, конечное состояние и каждый шаг на пути.

Это похоже на роль CTE в SQL моделях dbt. Мы используем CTE для извлечения данных из вышестоящих наборов данных, определения (и именования) серии значимых трансформаций и завершения с помощью финального оператора select. Вы можете запустить скомпилированную версию SQL модели dbt, чтобы увидеть данные, включенные в результирующее представление или таблицу. Когда вы выполняете dbt run, dbt оборачивает этот запрос в create view, create table или более сложные DDL, чтобы сохранить его результаты в базе данных.

Вместо финального оператора select каждая Python модель возвращает финальный DataFrame. Каждая операция DataFrame "лениво оценивается". В процессе разработки вы можете предварительно просмотреть его данные, используя методы, такие как .show() или .head(). Когда вы запускаете Python модель, полный результат финального DataFrame будет сохранен как таблица в вашем хранилище данных.

Python модели dbt имеют доступ почти ко всем тем же параметрам конфигурации, что и SQL модели. Вы можете тестировать и документировать их, добавлять свойства tags и meta, а также предоставлять доступ к их результатам другим пользователям. Вы можете выбирать их по имени, пути к файлу, конфигурациям, в зависимости от того, являются ли они вышестоящими или нижестоящими по отношению к другой модели, или если они были изменены по сравнению с предыдущим состоянием проекта.

Определение Python модели

Каждая Python модель находится в файле .py в вашей папке models/. Она определяет функцию с именем model(), которая принимает два параметра:

  • dbt: Класс, скомпилированный dbt Core, уникальный для каждой модели, позволяет вам запускать ваш Python код в контексте вашего проекта dbt и DAG.
  • session: Класс, представляющий соединение вашей платформы данных с Python бэкендом. Сессия необходима для чтения таблиц как DataFrame и записи DataFrame обратно в таблицы. В PySpark, по соглашению, SparkSession называется spark и доступен глобально. Для согласованности между платформами мы всегда передаем его в функцию model как явный аргумент, называемый session.

Функция model() должна возвращать один DataFrame. В Snowpark (Snowflake) это может быть Snowpark или pandas DataFrame. В BigQuery это может быть BigFrames, pandas или Spark DataFrame. При использовании PySpark (Databricks) это может быть Spark, pandas или pandas-on-Spark DataFrame. Подробнее о выборе между pandas и нативными DataFrame см. в разделе DataFrame API + syntax.

Когда вы выполняете dbt run --select python_model, dbt подготовит и передаст оба аргумента (dbt и session). Все, что вам нужно сделать, это определить функцию. Вот как должна выглядеть каждая Python модель:

models/my_python_model.py
def model(dbt, session):

...

return final_df

Ссылки на другие модели

Python модели полностью участвуют в направленном ациклическом графе (DAG) трансформаций dbt. Используйте метод dbt.ref() в Python модели, чтобы читать данные из других моделей (SQL или Python). Если вы хотите читать напрямую из исходной таблицы, используйте dbt.source(). Эти методы возвращают DataFrame, указывающие на вышестоящий источник, модель, seed или snapshot.

models/my_python_model.py
def model(dbt, session):

# DataFrame, представляющий вышестоящую модель
upstream_model = dbt.ref("upstream_model_name")

# DataFrame, представляющий вышестоящий источник
upstream_source = dbt.source("upstream_source_name", "table_name")

...

Конечно, вы можете использовать ref() для вашей Python модели в нижестоящих SQL моделях:

models/downstream_model.sql
with upstream_python_model as (

select * from {{ ref('my_python_model') }}

),

...
предупреждение

Ссылки на эфемерные модели в настоящее время не поддерживаются (см. запрос на добавление функции)

Начиная с версии dbt 1.8, Python-модели также поддерживают динамические конфигурации внутри Python f-строк. Это позволяет задавать более гибкие и динамические настройки моделей непосредственно в вашем Python-коде. Например:

models/my_python_model.py
# Ранее попытка доступа к значению конфигурации таким образом приводила к None
print(f"{dbt.config.get('my_var')}") # Вывод до изменения: None

# Теперь вы можете получить доступ к фактическому значению конфигурации
# Предполагая, что 'my_var' настроен на 5 для текущей модели
print(f"{dbt.config.get('my_var')}") # Вывод после изменения: 5

Это также означает, что вы можете использовать dbt.config.get() в Python моделях, чтобы гарантировать, что значения конфигурации эффективно доступны и могут быть использованы в Python f-строках.

Настройка Python моделей

Как и SQL модели, Python модели можно настроить тремя способами:

  1. В dbt_project.yml, где вы можете настроить множество моделей одновременно
  2. В отдельном .yml файле в каталоге models/
  3. Внутри файла .py модели, используя метод dbt.config()

Вызов метода dbt.config() установит конфигурации для вашей модели в вашем .py файле, аналогично макросу {{ config() }} в файлах .sql моделей:

models/my_python_model.py
def model(dbt, session):

# установка конфигурации
dbt.config(materialized="table")

Существует ограничение на сложность конфигурации, которую можно задать с помощью метода dbt.config(). Он принимает только литеральные значения (строки, булевы значения и числовые типы), а также динамическую конфигурацию. Передача другой функции или более сложной структуры данных невозможна.

Причина в том, что dbt статически анализирует аргументы, переданные в config(), во время парсинга модели, не выполняя ваш Python‑код. Если вам необходимо задать более сложную конфигурацию, мы рекомендуем определить её с помощью свойства config в YAML‑файле свойств.

Доступ к контексту проекта

Python модели dbt не используют Jinja для рендеринга скомпилированного кода. Python модели имеют ограниченный доступ к глобальным контекстам проекта по сравнению с SQL моделями. Этот контекст доступен из класса dbt, переданного в качестве аргумента функции model().

Из коробки класс dbt поддерживает:

  • Возвращение DataFrame, ссылающихся на местоположения других ресурсов: dbt.ref() + dbt.source()
  • Доступ к местоположению базы данных текущей модели: dbt.this() (также: dbt.this.database, .schema, .identifier)
  • Определение, является ли текущий запуск модели инкрементным: dbt.is_incremental

Этот контекст можно расширить, «получив» необходимые значения с помощью dbt.config.get() после того, как они будут настроены в конфигурации модели. Метод dbt.config.get() поддерживает динамический доступ к конфигурациям внутри Python‑моделей, что повышает гибкость логики модели. Это включает такие входные параметры, как var, env_var и target. Если вы хотите использовать эти значения для условной логики в вашей модели, необходимо задать их через специальную конфигурацию в properties YAML‑файле:

models/config.yml

models:
- name: my_python_model
config:
materialized: table
target_name: "{{ target.name }}"
specific_var: "{{ var('SPECIFIC_VAR') }}"
specific_env_var: "{{ env_var('SPECIFIC_ENV_VAR') }}"

Затем, в Python коде модели, используйте функцию dbt.config.get(), чтобы получить доступ к значениям конфигураций, которые были установлены:

models/my_python_model.py
def model(dbt, session):
target_name = dbt.config.get("target_name")
specific_var = dbt.config.get("specific_var")
specific_env_var = dbt.config.get("specific_env_var")

orders_df = dbt.ref("fct_orders")

# ограничение данных в dev
if target_name == "dev":
orders_df = orders_df.limit(500)

Доступ к пользовательским значениям meta

Чтобы хранить пользовательские значения, используйте конфигурацию meta. Например, если у вас есть модель с именем my_python_model и вы хотите хранить пользовательские значения, вы можете сделать следующее:

models/schema.yml

models:
- name: my_python_model
config:
meta:
custom_value: "111"
another_value: "abc"

Затем получите к ним доступ в вашей Python-модели, используя метод dbt.config.get(), чтобы сначала обратиться к объекту meta, а затем получить ваши пользовательские значения:

models/my_python_model.py
def model(dbt, session):
# First, get the meta object
meta = dbt.config.get("meta")

# Then access your custom values from meta
custom_value = meta.get("custom_value")
another_value = meta.get("another_value")

# Use your custom values in your model logic
orders_df = dbt.ref("fct_orders")
...

Динамические конфигурации

В дополнение к существующим методам настройки Python моделей, вы также имеете динамический доступ к значениям конфигурации, установленным с помощью dbt.config() в Python моделях, используя f-строки. Это увеличивает возможности для пользовательской логики и управления конфигурацией.

models/my_python_model.py
def model(dbt, session):
dbt.config(materialized="table")

# Динамический доступ к конфигурации в Python f-строках,
# что позволяет в реальном времени получать и использовать значения конфигурации.
# Предполагая, что 'my_var' установлен на 5, это выведет: Dynamic config value: 5
print(f"Dynamic config value: {dbt.config.get('my_var')}")

Материализации

Python модели поддерживают следующие материализации:

  • table (по умолчанию)
  • incremental

Инкрементные Python модели поддерживают все те же инкрементные стратегии, что и их SQL аналоги. Конкретные поддерживаемые стратегии зависят от вашего адаптера. Например, инкрементные модели поддерживаются на BigQuery с Dataproc для стратегии merge; стратегия insert_overwrite пока не поддерживается.

Python модели не могут быть материализованы как view или ephemeral. Python не поддерживается для ресурсов, отличных от моделей (например, тестов и снимков).

Для инкрементных моделей, как и для SQL моделей, вам нужно фильтровать входящие таблицы только на новые строки данных:

models/my_python_model.py
import snowflake.snowpark.functions as F

def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")

if dbt.is_incremental:

# только новые строки по сравнению с максимумом в текущей таблице
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])

# или только строки за последние 3 дня
df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))

...

return df

Специфическая функциональность Python

Определение функций

В дополнение к определению функции model, Python модель может импортировать другие функции или определять свои собственные. Вот пример на Snowpark, определяющий пользовательскую функцию add_one:

models/my_python_model.py
def add_one(x):
return x + 1

def model(dbt, session):
dbt.config(materialized="table")
temps_df = dbt.ref("temperatures")

# немного согреем
df = temps_df.withColumn("degree_plus_one", add_one(temps_df["degree"]))
return df

В настоящее время функции Python, определенные в одной модели dbt, не могут быть импортированы и повторно использованы в других моделях. Обратитесь к Повторное использование кода для потенциальных рассматриваемых шаблонов.

Использование пакетов PyPI

Вы также можете определять функции, которые зависят от сторонних пакетов, при условии, что эти пакеты установлены и доступны для Python runtime на вашей платформе данных.

В этом примере мы используем пакет holidays, чтобы определить, является ли заданная дата праздничным днём во Франции. В приведённом ниже коде для простоты и единообразия между платформами используется API pandas. Точный синтаксис, а также необходимость рефакторинга для многонодовой обработки, всё ещё могут различаться.

models/my_python_model.py
import holidays

def is_holiday(date_col):
# Chez Jaffle
french_holidays = holidays.France()
is_holiday = (date_col in french_holidays)
return is_holiday

def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["holidays"]
)

orders_df = dbt.ref("stg_orders")

df = orders_df.to_pandas()

# применяем нашу функцию
# (столбцы должны быть в верхнем регистре на Snowpark)
df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
df["ORDER_DATE"].dt.tz_localize('UTC') # преобразование из Number/Long в tz-aware Datetime

# возвращаем финальный набор данных (Pandas DataFrame)
return df

Настройка пакетов

Мы рекомендуем вам настраивать необходимые пакеты и версии, чтобы dbt мог отслеживать их в метаданных проекта. Эта конфигурация требуется для реализации на некоторых платформах. Если вам нужны конкретные версии пакетов, укажите их.

models/my_python_model.py
def model(dbt, session):
dbt.config(
packages = ["numpy==1.23.1", "scikit-learn"]
)
models/config.yml

models:
- name: my_python_model
config:
packages:
- "numpy==1.23.1"
- scikit-learn

Пользовательские функции (UDF)

Вы можете использовать декоратор @udf или функцию udf, чтобы определить «анонимную» функцию и вызвать её внутри преобразований DataFrame в вашей функции model. Это типичный паттерн для применения более сложных функций в виде операций над DataFrame, особенно в тех случаях, когда этим функциям требуются зависимости из сторонних пакетов.

подсказка

Вы также можете определять SQL- или Python-UDF как ресурсы первого класса в каталоге /functions с соответствующим файлом YAML. dbt собирает их как часть DAG, и вы ссылаетесь на них из SQL, используя {{ function('my_udf') }}. Эти UDF являются переиспользуемыми в разных инструментах (BI, ноутбуки, SQL-клиенты), поскольку они находятся в вашем хранилище данных.

models/my_python_model.py
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import numpy

def register_udf_add_random():
add_random = F.udf(
# используйте синтаксис 'lambda' для простого функционального поведения
lambda x: x + numpy.random.normal(),
return_type=T.FloatType(),
input_types=[T.FloatType()]
)
return add_random

def model(dbt, session):

dbt.config(
materialized = "table",
packages = ["numpy"]
)

temps_df = dbt.ref("temperatures")

add_random = register_udf_add_random()

# согреем, кто знает на сколько
df = temps_df.withColumn("degree_plus_random", add_random("degree"))
return df

Примечание: Из-за ограничения Snowpark в настоящее время невозможно зарегистрировать сложные именованные UDF в хранимых процедурах и, следовательно, в Python моделях dbt. Мы планируем добавить нативную поддержку Python UDF в качестве ресурса проекта/DAG в будущих выпусках. На данный момент, если вы хотите создать "векторизованную" Python UDF через Batch API, мы рекомендуем либо:

Повторное использование кода

Чтобы повторно использовать Python‑функцию в нескольких моделях dbt, вы можете определить Python UDF в каталоге /functions вместе с соответствующим YAML‑файлом. Эти UDF разворачиваются в вашем хранилище данных и могут переиспользоваться разными инструментами (BI‑системами, ноутбуками, SQL‑клиентами).

В будущем мы также рассматриваем возможность добавления поддержки Private Python packages. Помимо импорта переиспользуемых функций из публичных пакетов PyPI, многие платформы данных поддерживают загрузку пользовательских Python‑артефактов и их регистрацию в виде пакетов. Процесс загрузки отличается от платформы к платформе, но сам import вашего кода выглядит одинаково.

❓ Вопросы dbt
  • Как dbt может помочь пользователям при загрузке или инициализации приватных Python-ассетов? Является ли это новой формой dbt deps?
  • Как dbt может поддержать пользователей, которые хотят тестировать пользовательские функции? Если они определены как UDF: «юнит‑тестирование» прямо в базе данных? Если это «чистые» функции в пакетах: стоит ли поощрять использование pytest?

💬 Обсуждение: "Python модели: пакет, хранение артефактов/объектов и управление UDF в dbt"

API и синтаксис DataFrame

За последнее десятилетие большинство людей, пишущих трансформации данных на Python, приняли DataFrame в качестве своей общей абстракции. dbt следует этой конвенции, возвращая ref() и source() как DataFrame, и ожидает, что все Python модели будут возвращать DataFrame.

DataFrame — это двумерная структура данных (строки и столбцы). Он поддерживает удобные методы для преобразования этих данных и создания новых столбцов из вычислений, выполненных на существующих столбцах. Он также предлагает удобные способы предварительного просмотра данных при локальной разработке или в ноутбуке.

Примерно на этом согласие и заканчивается. Существует множество фреймворков с собственными синтаксисами и API для работы с DataFrame. Библиотека pandas предложила один из самых первых API для DataFrame, и её синтаксис является самым распространённым для изучения среди начинающих специалистов по данным. Большинство более новых API DataFrame совместимы с синтаксисом в стиле pandas, хотя немногие могут обеспечить полную взаимную совместимость. Это справедливо и для BigQuery DataFrames, Snowpark и PySpark, у которых есть собственные API DataFrame.

При разработке Python модели вы будете задаваться следующими вопросами:

Почему pandas? — Это самый распространенный API для DataFrame. Он упрощает исследование выборочных данных и разработку трансформаций локально. Вы можете "продвигать" свой код как есть в модели dbt и запускать его в производственной среде для небольших наборов данных.

Почему не pandas? — Производительность. pandas выполняет "одноузловые" трансформации, которые не могут воспользоваться параллелизмом и распределенными вычислениями, предлагаемыми современными хранилищами данных. Это быстро становится проблемой, когда вы работаете с большими наборами данных. Некоторые платформы данных поддерживают оптимизации для кода, написанного с использованием API DataFrame pandas, предотвращая необходимость в значительных рефакторингах. Например, pandas на PySpark предлагает поддержку 95% функциональности pandas, используя тот же API, при этом все еще используя параллельную обработку.

❓ Вопросы dbt
  • При разработке новой Python модели dbt, следует ли рекомендовать синтаксис в стиле pandas для быстрой итерации, а затем рефакторинг?
  • Какие библиотеки с открытым исходным кодом предоставляют убедительные абстракции для различных движков данных и специфических для поставщиков API?
  • Должен ли dbt пытаться играть долгосрочную роль в стандартизации среди них?

💬 Обсуждение: "Python модели: проблема pandas (и возможное решение)"

Ограничения

Python модели имеют возможности, которых нет у SQL моделей. У них также есть некоторые недостатки по сравнению с SQL моделями:

  • Время и стоимость. Python модели работают медленнее, чем SQL модели, и облачные ресурсы, которые их запускают, могут быть дороже. Запуск Python требует более общих вычислительных ресурсов. Эти вычислительные ресурсы могут иногда находиться на отдельной службе или архитектуре от ваших SQL моделей. Однако: Мы считаем, что развертывание Python моделей через dbt — с единой линией, тестированием и документацией — с человеческой точки зрения, значительно быстрее и дешевле. По сравнению с этим, развертывание отдельной инфраструктуры для оркестрации Python трансформаций в производственной среде и использование различных инструментов для интеграции с dbt занимает гораздо больше времени и дороже.

  • Различия в синтаксисе еще более выражены. За эти годы dbt сделал многое, используя шаблоны диспетчеризации и пакеты, такие как dbt_utils, чтобы абстрагировать различия в SQL диалектах среди популярных хранилищ данных. Python предлагает гораздо более широкое поле для игры. Если есть пять способов сделать что-то в SQL, то в Python их 500, все с различной производительностью и соответствием стандартам. Эти варианты могут быть ошеломляющими. Как поддерживающие dbt, мы будем учиться у передовых проектов, решающих эту проблему, и делиться рекомендациями по мере их разработки.

  • Эти возможности очень новые. По мере того, как хранилища данных разрабатывают новые функции, мы ожидаем, что они предложат более дешевые, быстрые и интуитивно понятные механизмы для развертывания Python трансформаций. Мы оставляем за собой право изменить основную реализацию для выполнения Python моделей в будущих выпусках. Наша приверженность вам заключается в коде в ваших файлах модели .py, следуя документированным возможностям и рекомендациям, которые мы предоставляем здесь.

  • Отсутствие поддержки print(). Платформа данных запускает и компилирует вашу Python модель без надзора dbt. Это означает, что она не отображает вывод команд, таких как встроенная функция Python print() в логах dbt.

В качестве общего правила: если преобразование можно одинаково хорошо написать как на SQL, так и на Python, мы считаем, что хорошо написанный SQL предпочтительнее. Он более доступен для большего числа коллег, и на нём проще писать код, который будет производительным при работе с большими объёмами данных. Если же преобразование нельзя написать на SQL, или в ситуации, когда десять строк элегантного и хорошо прокомментированного Python могут заменить тысячу строк трудночитаемого Jinja-SQL, стоит выбрать Python.

Нашли ошибку?

0
Loading