Программные вызовы
В версии v1.5 в dbt Core была добавлена поддержка программных вызовов. Цель этого — предоставить доступ к существующему CLI dbt Core через точку входа Python, чтобы команды верхнего уровня можно было вызывать непосредственно из Python-скрипта или приложения.
Точка входа — это класс dbtRunner, который позволяет вам invoke те же команды, что и в командной строке.
from dbt.cli.main import dbtRunner, dbtRunnerResult
# инициализация
dbt = dbtRunner()
# создание аргументов CLI в виде списка строк
cli_args = ["run", "--select", "tag:my_tag"]
# выполнение команды
res: dbtRunnerResult = dbt.invoke(cli_args)
# проверка результатов
for r in res.result:
print(f"{r.node.name}: {r.status}")
Параллельное выполнение не поддерживается
dbt-core не поддерживает безопасное параллельное выполнение для нескольких вызовов в одном процессе. Это означает, что небезопасно запускать несколько команд dbt одновременно. Это официально не рекомендуется и требует обертки процесса для управления подпроцессами. Это связано с тем, что:
- Параллельный запуск команд может приводить к неожиданному взаимодействию с платформой данных. Например, одновременный запуск
dbt runиdbt buildдля одних и тех же моделей может привести к непредсказуемым результатам. - Каждая команда
dbt-coreвзаимодействует с глобальными переменными Python. Чтобы обеспечить безопасную работу, команды необходимо выполнять в отдельных процессах — этого можно добиться, например, путём запуска подпроцессов или с помощью таких инструментов, как Celery.
Для выполнения безопасного параллельного запуска вы можете использовать dbt CLI или Studio IDE — оба варианта выполняют дополнительную работу по управлению конкурентным выполнением (несколькими процессами) за вас.
dbtRunnerResult
Каждая команда возвращает объект dbtRunnerResult, который имеет три атрибута:
success(bool): Успешно ли выполнена команда.result: Если команда завершилась (успешно или с обработанными ошибками), ее результат(ы). Тип возвращаемого значения зависит от команды.exception: Если при вызове dbt возникла необработанная ошибка и команда не завершилась, то исключение, которое она вызвала.
Существует 1:1 соответствие между кодами выхода CLI и dbtRunnerResult, возвращаемым программным вызовом:
| Loading table... |
Обязательства и оговорки
Начиная с версии dbt Core v1.5, мы берем на себя постоянное обязательство предоставлять Python‑точку входа с функциональным паритетом по отношению к CLI dbt Core. При этом мы оставляем за собой право изменять внутреннюю реализацию, используемую для достижения этой цели. Мы ожидаем, что текущая реализация откроет реальные сценарии использования в краткосрочной и среднесрочной перспективе, пока мы работаем над набором стабильных долгосрочных интерфейсов, которые в конечном итоге ее заменят.
В частности, объекты, возвращаемые каждой командой в dbtRunnerResult.result, не имеют полного контракта и, следовательно, могут изменяться. Некоторые из возвращаемых объектов частично документированы, потому что они частично пересекаются с содержимым артефактов dbt. Как объекты Python, они содержат гораздо больше полей и методов, чем доступно в сериализованных JSON-артефактах. Эти дополнительные поля и методы следует считать внутренними и подверженными изменениям в будущих версиях dbt-core.
Расширенные шаблоны использования
Синтаксис и поддержка этих шаблонов могут измениться в будущих версиях dbt-core.
Цель dbtRunner — предложить паритет с рабочими процессами CLI в программной среде. Существуют несколько расширенных шаблонов использования, которые расширяют возможности CLI.
Повторное использование объектов
Передавайте заранее созданные объекты в dbtRunner, чтобы избежать их повторного создания путем чтения файлов с диска. В настоящее время поддерживается только объект Manifest (содержимое проекта).
from dbt.cli.main import dbtRunner, dbtRunnerResult
from dbt.contracts.graph.manifest import Manifest
# используйте команду 'parse' для загрузки Manifest
res: dbtRunnerResult = dbtRunner().invoke(["parse"])
manifest: Manifest = res.result
# исследуйте manifest
# например, убедитесь, что у каждой публичной модели есть описание
for node in manifest.nodes.values():
if node.resource_type == "model" and node.access == "public":
assert node.description != "", f"{node.name} не имеет описания"
# повторно используйте этот manifest в последующих командах, чтобы пропустить разбор
dbt = dbtRunner(manifest=manifest)
cli_args = ["run", "--select", "tag:my_tag"]
res = dbt.invoke(cli_args)
Регистрация обратных вызовов
Регистрируйте callbacks в EventManager dbt, чтобы получить доступ к структурированным событиям и включить пользовательский логгинг. Текущее поведение обратных вызовов заключается в блокировке последующих шагов; эта функциональность не гарантируется в будущих версиях.
from dbt.cli.main import dbtRunner
from dbt_common.events.base_types import EventMsg
def print_version_callback(event: EventMsg):
if event.info.name == "MainReportVersion":
print(f"Мы рады использовать dbt{event.data.version}")
dbt = dbtRunner(callbacks=[print_version_callback])
dbt.invoke(["list"])
Переопределение параметров
Передавайте параметры в виде именованных аргументов, вместо списка строк в стиле CLI. В настоящее время dbt не будет выполнять никакую проверку или приведение типов для ваших входных данных. Подкоманда должна быть указана в списке в качестве первого позиционного аргумента.
from dbt.cli.main import dbtRunner
dbt = dbtRunner()
# эти команды эквивалентны
dbt.invoke(["--fail-fast", "run", "--select", "tag:my_tag"])
dbt.invoke(["run"], select=["tag:my_tag"], fail_fast=True)