Перейти к основному содержимому

Python модели

Обратите внимание, что только определенные платформы данных поддерживают модели dbt-py.

Мы рекомендуем вам:

Обзор

Python модели dbt (dbt-py) могут помочь вам решить задачи, которые невозможно решить с помощью SQL. Вы можете выполнять анализы, используя инструменты, доступные в экосистеме Python с открытым исходным кодом, включая передовые пакеты для науки о данных и статистики. Ранее вам требовалась бы отдельная инфраструктура и оркестрация для выполнения Python трансформаций в производственной среде. Python трансформации, определенные в dbt, являются моделями в вашем проекте с теми же возможностями тестирования, документации и отслеживания.

models/my_python_model.py
import ...

def model(dbt, session):

my_sql_model_df = dbt.ref("my_sql_model")

final_df = ... # то, что нельзя написать на SQL!

return final_df
models/config.yml
version: 2

models:
- name: my_python_model

# Документируйте в том же коде
description: Моя трансформация, написанная на Python

# Настройте интуитивно и привычно
config:
materialized: table
tags: ['python']

# Тестируйте результаты моей Python трансформации
columns:
- name: id
# Стандартная проверка для 'grain' результатов Python
tests:
- unique
- not_null
tests:
# Напишите свою собственную логику проверки (на SQL) для результатов Python
- custom_generic_test
SQL + Python, вместе наконецSQL + Python, вместе наконец

Предварительные условия для Python моделей dbt включают использование адаптера для платформы данных, поддерживающей полноценное выполнение Python. В Python модели dbt весь Python код выполняется удаленно на платформе. Ничего из этого не выполняется dbt локально. Мы верим в четкое разделение определения модели от выполнения модели. В этом и во многих других аспектах подход dbt к Python моделям отражает его давний подход к моделированию данных в SQL.

Мы написали это руководство, предполагая, что вы знакомы с dbt. Если вы никогда раньше не писали модель dbt, мы рекомендуем начать с чтения dbt Models. На протяжении всего текста мы будем проводить параллели между Python моделями и SQL моделями, а также четко указывать их различия.

Что такое Python модель?

Python модель dbt — это функция, которая читает источники dbt или другие модели, применяет серию трансформаций и возвращает преобразованный набор данных. Операции DataFrame определяют начальные точки, конечное состояние и каждый шаг на пути.

Это похоже на роль CTE в SQL моделях dbt. Мы используем CTE для извлечения данных из вышестоящих наборов данных, определения (и именования) серии значимых трансформаций и завершения с помощью финального оператора select. Вы можете запустить скомпилированную версию SQL модели dbt, чтобы увидеть данные, включенные в результирующее представление или таблицу. Когда вы выполняете dbt run, dbt оборачивает этот запрос в create view, create table или более сложные DDL, чтобы сохранить его результаты в базе данных.

Вместо финального оператора select каждая Python модель возвращает финальный DataFrame. Каждая операция DataFrame "лениво оценивается". В процессе разработки вы можете предварительно просмотреть его данные, используя методы, такие как .show() или .head(). Когда вы запускаете Python модель, полный результат финального DataFrame будет сохранен как таблица в вашем хранилище данных.

Python модели dbt имеют доступ почти ко всем тем же параметрам конфигурации, что и SQL модели. Вы можете тестировать и документировать их, добавлять свойства tags и meta, а также предоставлять доступ к их результатам другим пользователям. Вы можете выбирать их по имени, пути к файлу, конфигурациям, в зависимости от того, являются ли они вышестоящими или нижестоящими по отношению к другой модели, или если они были изменены по сравнению с предыдущим состоянием проекта.

Определение Python модели

Каждая Python модель находится в файле .py в вашей папке models/. Она определяет функцию с именем model(), которая принимает два параметра:

  • dbt: Класс, скомпилированный dbt Core, уникальный для каждой модели, позволяет вам запускать ваш Python код в контексте вашего проекта dbt и DAG.
  • session: Класс, представляющий соединение вашей платформы данных с Python бэкендом. Сессия необходима для чтения таблиц как DataFrame и записи DataFrame обратно в таблицы. В PySpark, по соглашению, SparkSession называется spark и доступен глобально. Для согласованности между платформами мы всегда передаем его в функцию model как явный аргумент, называемый session.

Функция model() должна возвращать один DataFrame. На Snowpark (Snowflake) это может быть Snowpark или pandas DataFrame. Через PySpark (Databricks + BigQuery) это может быть Spark, pandas или pandas-on-Spark DataFrame. Для получения дополнительной информации о выборе между pandas и нативными DataFrame, см. API и синтаксис DataFrame.

Когда вы выполняете dbt run --select python_model, dbt подготовит и передаст оба аргумента (dbt и session). Все, что вам нужно сделать, это определить функцию. Вот как должна выглядеть каждая Python модель:

models/my_python_model.py
def model(dbt, session):

...

return final_df

Ссылки на другие модели

Python модели полностью участвуют в направленном ациклическом графе (DAG) трансформаций dbt. Используйте метод dbt.ref() в Python модели, чтобы читать данные из других моделей (SQL или Python). Если вы хотите читать напрямую из исходной таблицы, используйте dbt.source(). Эти методы возвращают DataFrame, указывающие на вышестоящий источник, модель, seed или snapshot.

models/my_python_model.py
def model(dbt, session):

# DataFrame, представляющий вышестоящую модель
upstream_model = dbt.ref("upstream_model_name")

# DataFrame, представляющий вышестоящий источник
upstream_source = dbt.source("upstream_source_name", "table_name")

...

Конечно, вы можете использовать ref() для вашей Python модели в нижестоящих SQL моделях:

models/downstream_model.sql
with upstream_python_model as (

select * from {{ ref('my_python_model') }}

),

...
предупреждение

Ссылки на эфемерные модели в настоящее время не поддерживаются (см. запрос на добавление функции)

Настройка Python моделей

Как и SQL модели, Python модели можно настроить тремя способами:

  1. В dbt_project.yml, где вы можете настроить множество моделей одновременно
  2. В отдельном .yml файле в каталоге models/
  3. Внутри файла .py модели, используя метод dbt.config()

Вызов метода dbt.config() установит конфигурации для вашей модели в вашем .py файле, аналогично макросу {{ config() }} в файлах .sql моделей:

models/my_python_model.py
def model(dbt, session):

# установка конфигурации
dbt.config(materialized="table")

Существует ограничение на сложность, которую вы можете достичь с помощью метода dbt.config(). Он принимает только буквальные значения (строки, логические и числовые типы) и динамическую конфигурацию. Передача другой функции или более сложной структуры данных невозможна. Причина в том, что dbt статически анализирует аргументы для config() при разборе вашей модели без выполнения вашего Python кода. Если вам нужно установить более сложную конфигурацию, мы рекомендуем определить ее, используя свойство config в YAML файле.

Доступ к контексту проекта

Python модели dbt не используют Jinja для рендеринга скомпилированного кода. Python модели имеют ограниченный доступ к глобальным контекстам проекта по сравнению с SQL моделями. Этот контекст доступен из класса dbt, переданного в качестве аргумента функции model().

Из коробки класс dbt поддерживает:

  • Возвращение DataFrame, ссылающихся на местоположения других ресурсов: dbt.ref() + dbt.source()
  • Доступ к местоположению базы данных текущей модели: dbt.this() (также: dbt.this.database, .schema, .identifier)
  • Определение, является ли текущий запуск модели инкрементным: dbt.is_incremental

Возможно расширение этого контекста путем "получения" их с помощью dbt.config.get() после того, как они настроены в конфигурации модели. Начиная с dbt v1.8, метод dbt.config.get() поддерживает динамический доступ к конфигурациям в Python моделях, увеличивая гибкость в логике модели. Это включает входные данные, такие как var, env_var и target. Если вы хотите использовать эти значения для условной логики в вашей модели, мы требуем их настройки через конфигурацию в отдельном YAML файле:

models/config.yml
version: 2

models:
- name: my_python_model
config:
materialized: table
target_name: "{{ target.name }}"
specific_var: "{{ var('SPECIFIC_VAR') }}"
specific_env_var: "{{ env_var('SPECIFIC_ENV_VAR') }}"

Затем, в Python коде модели, используйте функцию dbt.config.get(), чтобы получить доступ к значениям конфигураций, которые были установлены:

models/my_python_model.py
def model(dbt, session):
target_name = dbt.config.get("target_name")
specific_var = dbt.config.get("specific_var")
specific_env_var = dbt.config.get("specific_env_var")

orders_df = dbt.ref("fct_orders")

# ограничение данных в dev
if target_name == "dev":
orders_df = orders_df.limit(500)

Материализации

Python модели поддерживают следующие материализации:

  • table (по умолчанию)
  • incremental

Инкрементные Python модели поддерживают все те же инкрементные стратегии, что и их SQL аналоги. Конкретные поддерживаемые стратегии зависят от вашего адаптера. Например, инкрементные модели поддерживаются на BigQuery с Dataproc для стратегии merge; стратегия insert_overwrite пока не поддерживается.

Python модели не могут быть материализованы как view или ephemeral. Python не поддерживается для ресурсов, отличных от моделей (например, тестов и снимков).

Для инкрементных моделей, как и для SQL моделей, вам нужно фильтровать входящие таблицы только на новые строки данных:

models/my_python_model.py
import snowflake.snowpark.functions as F

def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")

if dbt.is_incremental:

# только новые строки по сравнению с максимумом в текущей таблице
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])

# или только строки за последние 3 дня
df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))

...

return df

Специфическая функциональность Python

Определение функций

В дополнение к определению функции model, Python модель может импортировать другие функции или определять свои собственные. Вот пример на Snowpark, определяющий пользовательскую функцию add_one:

models/my_python_model.py
def add_one(x):
return x + 1

def model(dbt, session):
dbt.config(materialized="table")
temps_df = dbt.ref("temperatures")

# немного согреем
df = temps_df.withColumn("degree_plus_one", add_one(temps_df["degree"]))
return df

В настоящее время функции Python, определенные в одной модели dbt, не могут быть импортированы и повторно использованы в других моделях. Обратитесь к Повторное использование кода для потенциальных рассматриваемых шаблонов.

Использование пакетов PyPI

Вы также можете определять функции, которые зависят от сторонних пакетов, при условии, что эти пакеты установлены и доступны для выполнения Python на вашей платформе данных. См. заметки о "Установке пакетов" для определенных платформ данных.

В этом примере мы используем пакет holidays, чтобы определить, является ли данная дата праздником во Франции. Код ниже использует API pandas для простоты и согласованности между платформами. Точный синтаксис и необходимость рефакторинга для многозадачной обработки все еще различаются.

models/my_python_model.py
import holidays

def is_holiday(date_col):
# Chez Jaffle
french_holidays = holidays.France()
is_holiday = (date_col in french_holidays)
return is_holiday

def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["holidays"]
)

orders_df = dbt.ref("stg_orders")

df = orders_df.to_pandas()

# применяем нашу функцию
# (столбцы должны быть в верхнем регистре на Snowpark)
df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
df["ORDER_DATE"].dt.tz_localize('UTC') # преобразование из Number/Long в tz-aware Datetime

# возвращаем финальный набор данных (Pandas DataFrame)
return df

Настройка пакетов

Мы рекомендуем вам настраивать необходимые пакеты и версии, чтобы dbt мог отслеживать их в метаданных проекта. Эта конфигурация требуется для реализации на некоторых платформах. Если вам нужны конкретные версии пакетов, укажите их.

models/my_python_model.py
def model(dbt, session):
dbt.config(
packages = ["numpy==1.23.1", "scikit-learn"]
)
models/config.yml
version: 2

models:
- name: my_python_model
config:
packages:
- "numpy==1.23.1"
- scikit-learn

Пользовательские функции (UDF)

Вы можете использовать декоратор @udf или функцию udf, чтобы определить "анонимную" функцию и вызвать ее в преобразовании DataFrame функции model. Это типичный шаблон для применения более сложных функций в качестве операций DataFrame, особенно если эти функции требуют входных данных от сторонних пакетов.

models/my_python_model.py
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import numpy

def register_udf_add_random():
add_random = F.udf(
# используйте синтаксис 'lambda' для простого функционального поведения
lambda x: x + numpy.random.normal(),
return_type=T.FloatType(),
input_types=[T.FloatType()]
)
return add_random

def model(dbt, session):

dbt.config(
materialized = "table",
packages = ["numpy"]
)

temps_df = dbt.ref("temperatures")

add_random = register_udf_add_random()

# согреем, кто знает на сколько
df = temps_df.withColumn("degree_plus_random", add_random("degree"))
return df

Примечание: Из-за ограничения Snowpark в настоящее время невозможно зарегистрировать сложные именованные UDF в хранимых процедурах и, следовательно, в Python моделях dbt. Мы планируем добавить нативную поддержку Python UDF в качестве ресурса проекта/DAG в будущих выпусках. На данный момент, если вы хотите создать "векторизованную" Python UDF через Batch API, мы рекомендуем либо:

Повторное использование кода

В настоящее время функции Python, определенные в одной модели dbt, не могут быть импортированы и повторно использованы в других моделях. Это то, что dbt Labs хотел бы поддерживать, поэтому мы рассматриваем два шаблона:

  • Создание и регистрация "именованных" UDF — Этот процесс отличается на разных платформах данных и имеет некоторые ограничения по производительности. Например, Snowpark поддерживает векторизованные UDF для функций, подобных pandas, которые можно выполнять параллельно.
  • Частные Python пакеты — В дополнение к импорту повторно используемых функций из публичных пакетов PyPI, многие платформы данных поддерживают загрузку пользовательских Python активов и регистрацию их как пакетов. Процесс загрузки выглядит по-разному на разных платформах, но фактический import вашего кода выглядит одинаково.
❓ Вопросы dbt
  • Должен ли dbt играть роль в абстрагировании над UDF? Должен ли dbt поддерживать новый тип узла DAG, function? Будет ли основным случаем использования повторное использование кода в Python моделях или определение функций на языке Python, которые можно вызывать из SQL моделей?
  • Как dbt может помочь пользователям при загрузке или инициализации частных Python активов? Это новая форма dbt deps?
  • Как dbt может поддерживать пользователей, которые хотят тестировать пользовательские функции? Если они определены как UDF: "юнит-тестирование" в базе данных? Если "чистые" функции в пакетах: поощрять использование pytest?

💬 Обсуждение: "Python модели: пакет, хранение артефактов/объектов и управление UDF в dbt"

API и синтаксис DataFrame

За последнее десятилетие большинство людей, пишущих трансформации данных на Python, приняли DataFrame в качестве своей общей абстракции. dbt следует этой конвенции, возвращая ref() и source() как DataFrame, и ожидает, что все Python модели будут возвращать DataFrame.

DataFrame — это двумерная структура данных (строки и столбцы). Он поддерживает удобные методы для преобразования этих данных и создания новых столбцов из вычислений, выполненных на существующих столбцах. Он также предлагает удобные способы предварительного просмотра данных при локальной разработке или в ноутбуке.

На этом согласие заканчивается. Существует множество фреймворков с собственными синтаксисами и API для DataFrame. Библиотека pandas предложила один из оригинальных API для DataFrame, и ее синтаксис является наиболее распространенным для изучения новыми специалистами по данным. Большинство новых API для DataFrame совместимы с синтаксисом pandas, хотя немногие могут предложить идеальную совместимость. Это верно для Snowpark и PySpark, у которых есть свои собственные API для DataFrame.

При разработке Python модели вы будете задаваться следующими вопросами:

Почему pandas? — Это самый распространенный API для DataFrame. Он упрощает исследование выборочных данных и разработку трансформаций локально. Вы можете "продвигать" свой код как есть в модели dbt и запускать его в производственной среде для небольших наборов данных.

Почему не pandas? — Производительность. pandas выполняет "одноузловые" трансформации, которые не могут воспользоваться параллелизмом и распределенными вычислениями, предлагаемыми современными хранилищами данных. Это быстро становится проблемой, когда вы работаете с большими наборами данных. Некоторые платформы данных поддерживают оптимизации для кода, написанного с использованием API DataFrame pandas, предотвращая необходимость в значительных рефакторингах. Например, pandas на PySpark предлагает поддержку 95% функциональности pandas, используя тот же API, при этом все еще используя параллельную обработку.

❓ Вопросы dbt
  • При разработке новой Python модели dbt, следует ли рекомендовать синтаксис в стиле pandas для быстрой итерации, а затем рефакторинг?
  • Какие библиотеки с открытым исходным кодом предоставляют убедительные абстракции для различных движков данных и специфических для поставщиков API?
  • Должен ли dbt пытаться играть долгосрочную роль в стандартизации среди них?

💬 Обсуждение: "Python модели: проблема pandas (и возможное решение)"

Ограничения

Python модели имеют возможности, которых нет у SQL моделей. У них также есть некоторые недостатки по сравнению с SQL моделями:

  • Время и стоимость. Python модели работают медленнее, чем SQL модели, и облачные ресурсы, которые их запускают, могут быть дороже. Запуск Python требует более общих вычислительных ресурсов. Эти вычислительные ресурсы могут иногда находиться на отдельной службе или архитектуре от ваших SQL моделей. Однако: Мы считаем, что развертывание Python моделей через dbt — с единой линией, тестированием и документацией — с человеческой точки зрения, значительно быстрее и дешевле. По сравнению с этим, развертывание отдельной инфраструктуры для оркестрации Python трансформаций в производственной среде и использование различных инструментов для интеграции с dbt занимает гораздо больше времени и дороже.

  • Различия в синтаксисе еще более выражены. За эти годы dbt сделал многое, используя шаблоны диспетчеризации и пакеты, такие как dbt_utils, чтобы абстрагировать различия в SQL диалектах среди популярных хранилищ данных. Python предлагает гораздо более широкое поле для игры. Если есть пять способов сделать что-то в SQL, то в Python их 500, все с различной производительностью и соответствием стандартам. Эти варианты могут быть ошеломляющими. Как поддерживающие dbt, мы будем учиться у передовых проектов, решающих эту проблему, и делиться рекомендациями по мере их разработки.

  • Эти возможности очень новые. По мере того, как хранилища данных разрабатывают новые функции, мы ожидаем, что они предложат более дешевые, быстрые и интуитивно понятные механизмы для развертывания Python трансформаций. Мы оставляем за собой право изменить основную реализацию для выполнения Python моделей в будущих выпусках. Наша приверженность вам заключается в коде в ваших файлах модели .py, следуя документированным возможностям и рекомендациям, которые мы предоставляем здесь.

  • Отсутствие поддержки print(). Платформа данных запускает и компилирует вашу Python модель без надзора dbt. Это означает, что она не отображает вывод команд, таких как встроенная функция Python print() в логах dbt.

Как общее правило, если есть трансформация, которую вы могли бы написать одинаково хорошо на SQL или Python, мы считаем, что хорошо написанный SQL предпочтительнее: он более доступен для большего числа коллег, и его легче написать так, чтобы он был производительным в масштабе. Если есть трансформация, которую вы не можете написать на SQL, или где десять строк элегантного и хорошо аннотированного Python могут сэкономить вам 1000 строк трудно читаемого Jinja-SQL, Python — это путь.

Определенные платформы данных

В своем первоначальном запуске Python модели поддерживаются на трех из самых популярных платформ данных: Snowflake, Databricks и BigQuery/GCP (через Dataproc). И Databricks, и Dataproc от GCP используют PySpark в качестве фреймворка обработки. Snowflake использует свой собственный фреймворк, Snowpark, который имеет много сходств с PySpark.

Дополнительная настройка: Вам нужно будет признать и принять условия третьих сторон Snowflake, чтобы использовать пакеты Anaconda.

Установка пакетов: Snowpark поддерживает несколько популярных пакетов через Anaconda. Обратитесь к полному списку для получения дополнительной информации. Пакеты устанавливаются при запуске вашей модели. Разные модели могут иметь разные зависимости от пакетов. Если вы используете сторонние пакеты, Snowflake рекомендует использовать выделенный виртуальный склад для лучшей производительности, а не тот, который имеет много одновременных пользователей.

Версия Python: Чтобы указать другую версию Python, используйте следующую конфигурацию:

def model(dbt, session):
dbt.config(
materialized = "table",
python_version="3.11"
)

О "sprocs": dbt отправляет Python модели для выполнения в виде хранимых процедур, которые некоторые люди называют sprocs для краткости. По умолчанию dbt будет использовать временные или анонимные хранимые процедуры Snowpark (документация), которые быстрее и сохраняют историю запросов чище, чем именованные sprocs, содержащие скомпилированный Python код вашей модели. Чтобы отключить эту функцию, установите use_anonymous_sproc: False в конфигурации вашей модели.

Документация: "Руководство разработчика: Snowpark Python"

Сторонние пакеты Snowflake

Чтобы использовать сторонний пакет Snowflake, который недоступен в Snowflake Anaconda, загрузите ваш пакет, следуя этому примеру, а затем настройте параметр imports в Python модели dbt, чтобы ссылаться на zip файл в вашей Snowflake стадии.

Вот полный пример конфигурации с использованием zip файла, включая использование imports в Python модели:


def model(dbt, session):
# Настройка модели
dbt.config(
materialized="table",
imports=["@mystage/mycustompackage.zip"], # Укажите местоположение внешнего пакета
)

# Пример преобразования данных с использованием импортированного пакета
# (Предполагается, что `some_external_package` имеет функцию, которую мы можем вызвать)
data = {
"name": ["Alice", "Bob", "Charlie"],
"score": [85, 90, 88]
}
df = pd.DataFrame(data)

# Обработка данных с помощью внешнего пакета
df["adjusted_score"] = df["score"].apply(lambda x: some_external_package.adjust_score(x))

# Возвращаем DataFrame как результат модели
return df

Для получения дополнительной информации о использовании этой конфигурации обратитесь к документации Snowflake о загрузке и использовании других python пакетов в Snowpark, не опубликованных на канале Anaconda Snowflake.

0