Перейти к основному содержимому

Программные вызовы

В версии v1.5 в dbt Core была добавлена поддержка программных вызовов. Цель этого — предоставить доступ к существующему CLI dbt Core через точку входа Python, чтобы команды верхнего уровня можно было вызывать непосредственно из Python-скрипта или приложения.

Точка входа — это класс dbtRunner, который позволяет вам invoke те же команды, что и в командной строке.

from dbt.cli.main import dbtRunner, dbtRunnerResult

# инициализация
dbt = dbtRunner()

# создание аргументов CLI в виде списка строк
cli_args = ["run", "--select", "tag:my_tag"]

# выполнение команды
res: dbtRunnerResult = dbt.invoke(cli_args)

# проверка результатов
for r in res.result:
print(f"{r.node.name}: {r.status}")

Параллельное выполнение не поддерживается

dbt-core не поддерживает безопасное параллельное выполнение для нескольких вызовов в одном процессе. Это означает, что небезопасно запускать несколько команд dbt одновременно. Это официально не рекомендуется и требует обертки процесса для управления подпроцессами. Это связано с тем, что:

  • Параллельный запуск команд может приводить к неожиданному взаимодействию с платформой данных. Например, одновременный запуск dbt run и dbt build для одних и тех же моделей может привести к непредсказуемым результатам.
  • Каждая команда dbt-core взаимодействует с глобальными переменными Python. Чтобы обеспечить безопасную работу, команды необходимо выполнять в отдельных процессах — этого можно добиться, например, путём запуска подпроцессов или с помощью таких инструментов, как Celery.

Для выполнения безопасного параллельного запуска вы можете использовать dbt CLI или Studio IDE — оба варианта выполняют дополнительную работу по управлению конкурентным выполнением (несколькими процессами) за вас.

dbtRunnerResult

Каждая команда возвращает объект dbtRunnerResult, который имеет три атрибута:

  • success (bool): Успешно ли выполнена команда.
  • result: Если команда завершилась (успешно или с обработанными ошибками), ее результат(ы). Тип возвращаемого значения зависит от команды.
  • exception: Если при вызове dbt возникла необработанная ошибка и команда не завершилась, то исключение, которое она вызвала.

Существует 1:1 соответствие между кодами выхода CLI и dbtRunnerResult, возвращаемым программным вызовом:

СценарийКод выхода CLIsuccessresultexception
Вызов завершен без ошибок0Trueзависит от командыNone
Вызов завершен с по крайней мере одной обработанной ошибкой (например, ошибка теста, ошибка сборки модели)1Falseзависит от командыNone
Необработанная ошибка. Вызов не завершен и не возвращает результатов.2FalseNoneException
Loading table...

Обязательства и оговорки

Начиная с версии dbt Core v1.5, мы берем на себя постоянное обязательство предоставлять Python‑точку входа с функциональным паритетом по отношению к CLI dbt Core. При этом мы оставляем за собой право изменять внутреннюю реализацию, используемую для достижения этой цели. Мы ожидаем, что текущая реализация откроет реальные сценарии использования в краткосрочной и среднесрочной перспективе, пока мы работаем над набором стабильных долгосрочных интерфейсов, которые в конечном итоге ее заменят.

В частности, объекты, возвращаемые каждой командой в dbtRunnerResult.result, не имеют полного контракта и, следовательно, могут изменяться. Некоторые из возвращаемых объектов частично документированы, потому что они частично пересекаются с содержимым артефактов dbt. Как объекты Python, они содержат гораздо больше полей и методов, чем доступно в сериализованных JSON-артефактах. Эти дополнительные поля и методы следует считать внутренними и подверженными изменениям в будущих версиях dbt-core.

Расширенные шаблоны использования

предупреждение

Синтаксис и поддержка этих шаблонов могут измениться в будущих версиях dbt-core.

Цель dbtRunner — предложить паритет с рабочими процессами CLI в программной среде. Существуют несколько расширенных шаблонов использования, которые расширяют возможности CLI.

Повторное использование объектов

Передавайте заранее созданные объекты в dbtRunner, чтобы избежать их повторного создания путем чтения файлов с диска. В настоящее время поддерживается только объект Manifest (содержимое проекта).

from dbt.cli.main import dbtRunner, dbtRunnerResult
from dbt.contracts.graph.manifest import Manifest

# используйте команду 'parse' для загрузки Manifest
res: dbtRunnerResult = dbtRunner().invoke(["parse"])
manifest: Manifest = res.result

# исследуйте manifest
# например, убедитесь, что у каждой публичной модели есть описание
for node in manifest.nodes.values():
if node.resource_type == "model" and node.access == "public":
assert node.description != "", f"{node.name} не имеет описания"

# повторно используйте этот manifest в последующих командах, чтобы пропустить разбор
dbt = dbtRunner(manifest=manifest)
cli_args = ["run", "--select", "tag:my_tag"]
res = dbt.invoke(cli_args)

Регистрация обратных вызовов

Регистрируйте callbacks в EventManager dbt, чтобы получить доступ к структурированным событиям и включить пользовательский логгинг. Текущее поведение обратных вызовов заключается в блокировке последующих шагов; эта функциональность не гарантируется в будущих версиях.

from dbt.cli.main import dbtRunner
from dbt_common.events.base_types import EventMsg

def print_version_callback(event: EventMsg):
if event.info.name == "MainReportVersion":
print(f"Мы рады использовать dbt{event.data.version}")

dbt = dbtRunner(callbacks=[print_version_callback])
dbt.invoke(["list"])

Переопределение параметров

Передавайте параметры в виде именованных аргументов, вместо списка строк в стиле CLI. В настоящее время dbt не будет выполнять никакую проверку или приведение типов для ваших входных данных. Подкоманда должна быть указана в списке в качестве первого позиционного аргумента.

from dbt.cli.main import dbtRunner
dbt = dbtRunner()

# эти команды эквивалентны
dbt.invoke(["--fail-fast", "run", "--select", "tag:my_tag"])
dbt.invoke(["run"], select=["tag:my_tag"], fail_fast=True)

Нашли ошибку?

0
Loading