Использование Databricks workflows для запуска dbt jobs
Введение
Использование рабочих процессов Databricks для вызова API заданий dbt может быть полезно по нескольким причинам:
- Интеграция с другими ETL‑процессами — Если вы уже запускаете другие ETL‑процессы в Databricks, вы можете использовать рабочий процесс Databricks для запуска задания dbt после завершения этих процессов.
- Использование возможностей заданий dbt — dbt предоставляет возможности мониторинга выполнения заданий, управления историческими логами и документацией, оптимизации времени выполнения моделей и многого другого.
- Разделение ответственности — Подробные логи заданий dbt в среде dbt позволяют добиться большей модульности и более эффективной отладки. Это упрощает быструю изоляцию ошибок, при этом сохраняя возможность видеть общий статус выполнения в Databricks.
- Пользовательский запуск заданий — Используйте рабочий процесс Databricks для запуска заданий dbt на основе пользовательских условий или логики, которые не поддерживаются нативно механизмом планирования dbt. Это дает больше гибкости в том, когда и как запускаются ваши задания dbt.
Предварительные требования
- Активная учетная запись Enterprise или Enterprise+ dbt
- У вас должна быть настроена и существующая задача деплоя dbt
- Активная учетная запись Databricks с доступом к рабочему пространству Data Science and Engineering и к разделу Manage secrets
- Databricks CLI
- Примечание: Вам нужно только настроить аутентификацию. Как только вы настроите Host и Token и сможете выполнить
databricks workspace ls /Users/<someone@example.com>, вы можете продолжить выполнение этого руководства.
- Примечание: Вам нужно только настроить аутентификацию. Как только вы настроите Host и Token и сможете выполнить
Настройка области секретов Databricks
-
Получите personal access token или Service account token в dbt.
-
Настройте Databricks secret scope, который используется для безопасного хранения вашего API‑ключа dbt.
-
Введите следующие команды в вашем терминале:
# В этом примере мы настраиваем область секретов и ключ, называемые "dbt-cloud" и "api-key" соответственно.
databricks secrets create-scope --scope <YOUR_SECRET_SCOPE>
databricks secrets put --scope <YOUR_SECRET_SCOPE> --key <YOUR_SECRET_KEY> --string-value "<YOUR_DBT_CLOUD_API_KEY>"
-
Замените
<YOUR_SECRET_SCOPE>и<YOUR_SECRET_KEY>на ваши уникальные идентификаторы. Нажмите здесь для получения дополнительной информации о секретах. -
Замените
<YOUR_DBT_CLOUD_API_KEY>на фактическое значение API‑ключа, который вы скопировали из dbt на шаге 1.
Создание Python-ноутбука в Databricks
-
Создайте Python-ноутбук в Databricks, который выполняет Python-скрипт, вызывающий API заданий dbt Cloud.
-
Создайте Databricks Python notebook, который выполняет Python‑скрипт, вызывающий API заданий dbt.
-
Напишите Python‑скрипт, который использует библиотеку
requestsдля выполнения HTTP POST‑запроса к endpoint API заданий dbt с использованием необходимых параметров. Ниже приведён пример такого скрипта:
import enum
import os
import time
import json
import requests
from getpass import getpass
dbutils.widgets.text("job_id", "Enter the Job ID")
job_id = dbutils.widgets.get("job_id")
account_id = <YOUR_ACCOUNT_ID>
base_url = "<YOUR_BASE_URL>"
api_key = dbutils.secrets.get(scope = "<YOUR_SECRET_SCOPE>", key = "<YOUR_SECRET_KEY>")
# Это задокументировано в документации API dbt
class DbtJobRunStatus(enum.IntEnum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30
def _trigger_job() -> int:
res = requests.post(
url=f"https://{base_url}/api/v2/accounts/{account_id}/jobs/{job_id}/run/",
headers={'Authorization': f"Token {api_key}"},
json={
# При необходимости можно передать описание, которое будет отображаться в API <Constant name="cloud" />.
# См. документацию по API для информации о дополнительных параметрах, которые можно передать,
# включая `schema_override`
'cause': f"Triggered by Databricks Workflows.",
}
)
try:
res.raise_for_status()
except:
print(f"API token (last four): ...{api_key[-4:]}")
raise
response_payload = res.json()
return response_payload['data']['id']
def _get_job_run_status(job_run_id):
res = requests.get(
url=f"https://{base_url}/api/v2/accounts/{account_id}/runs/{job_run_id}/",
headers={'Authorization': f"Token {api_key}"},
)
res.raise_for_status()
response_payload = res.json()
return response_payload['data']['status']
def run():
job_run_id = _trigger_job()
print(f"job_run_id = {job_run_id}")
while True:
time.sleep(5)
status = _get_job_run_status(job_run_id)
print(DbtJobRunStatus(status))
if status == DbtJobRunStatus.SUCCESS:
break
elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
raise Exception("Failure!")
if __name__ == '__main__':
run()
-
Замените
<YOUR_SECRET_SCOPE>и<YOUR_SECRET_KEY>на значения, которые вы использовали ранее. -
Замените
<YOUR_BASE_URL>и<YOUR_ACCOUNT_ID>на корректные значения для вашего окружения и Access URL для вашего региона и тарифного плана.- Чтобы найти эти значения, перейдите в dbt, выберите Deploy -> Jobs. Выберите задание (Job), которое хотите запустить, и скопируйте URL. Например:
https://YOUR_ACCESS_URL/deploy/000000/projects/111111/jobs/222222— тогда корректный код будет таким:
- Чтобы найти эти значения, перейдите в dbt, выберите Deploy -> Jobs. Выберите задание (Job), которое хотите запустить, и скопируйте URL. Например:
Ваш URL имеет вид https://<YOUR_BASE_URL>/deploy/<YOUR_ACCOUNT_ID>/projects/<YOUR_PROJECT_ID>/jobs/<YOUR_JOB_ID>
account_id = 000000
job_id = 222222
base_url = "cloud.getdbt.com"
-
Запустите ноутбук. Он завершится с ошибкой, но вы должны увидеть виджет
job_idв верхней части вашего ноутбука. -
В виджете введите ваш
job_idиз шага 4. -
Запустите Notebook ещё раз, чтобы инициировать задание dbt. Результаты должны выглядеть примерно следующим образом:
job_run_id = 123456
DbtJobRunStatus.QUEUED
DbtJobRunStatus.QUEUED
DbtJobRunStatus.QUEUED
DbtJobRunStatus.STARTING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.SUCCESS
Вы можете при необходимости отменить задание в dbt.
Настройка рабочих процессов для запуска dbt jobs
Вы можете настроить рабочие процессы непосредственно из ноутбука ИЛИ добавив этот ноутбук в один из ваших существующих рабочих процессов:
- Создать рабочий процесс из существующего ноутбука
- Добавить ноутбук в существующий рабочий процесс
- Нажмите Schedule в правой верхней части страницы
- Нажмите Add a schedule
- Настройте имя задания, расписание, кластер
- Добавьте новый параметр с именем:
job_idи заполните ваш ID задания. Обратитесь к шагу 4 в предыдущем разделе, чтобы найти ваш ID задания. - Нажмите Create
- Нажмите Run Now, чтобы протестировать задание
- Откройте существующий Workflow
- Нажмите Tasks
- Нажмите значок “+”, чтобы добавить новую задачу
- Введите следующее:
| Loading table... |
- Выберите Save Task
- Нажмите Run Now, чтобы протестировать рабочий процесс
Несколько задач Workflow можно настроить с использованием одного и того же ноутбука, если сконфигурировать параметр job_id так, чтобы он указывал на разные задания dbt.
Использование Databricks workflows для доступа к API заданий dbt позволяет улучшить интеграцию процессов вашего конвейера данных и дает возможность планировать более сложные рабочие процессы.