Использование рабочих процессов Databricks для запуска заданий dbt Cloud
Введение
Использование рабочих процессов Databricks для вызова API заданий dbt Cloud может быть полезным по нескольким причинам:
- Интеграция с другими ETL-процессами — Если вы уже запускаете другие ETL-процессы в Databricks, вы можете использовать рабочий процесс Databricks для запуска задания dbt Cloud после завершения этих процессов.
- Использование функций заданий dbt Cloud — dbt Cloud предоставляет возможность мониторинга прогресса заданий, управления историческими логами и документацией, оптимизации времени выполнения моделей и многого другого.
- Разделение ответственности — Подробные логи для заданий dbt в среде dbt Cloud могут привести к большей модульности и эффективной отладке. Это упрощает изоляцию ошибок, сохраняя возможность видеть общий статус в Databricks.
- Пользовательская активация заданий — Используйте рабочий процесс Databricks для запу ска заданий dbt Cloud на основе пользовательских условий или логики, которые не поддерживаются нативно функцией планирования dbt Cloud. Это может дать вам больше гибкости в отношении времени и способа запуска ваших заданий dbt Cloud.
Предварительные требования
- Активная учетная запись Teams или Enterprise dbt Cloud
- У вас должно быть настроенное и существующее задание развертывания dbt Cloud
- Активная учетная запись Databricks с доступом к рабочему пространству Data Science и Engineering и управлению секретами
- Databricks CLI
- Примечание: Вам нужно только настроить аутентификацию. Как только вы настроите Host и Token и сможете выполни ть
databricks workspace ls /Users/<someone@example.com>
, вы можете продолжить выполнение этого руководства.
- Примечание: Вам нужно только настроить аутентификацию. Как только вы настроите Host и Token и сможете выполни ть
Настройка области секретов Databricks
-
Получите персональный токен доступа или токен учетной записи службы из dbt Cloud.
-
Настройте область секретов Databricks, которая используется для безопасного хранения вашего API-ключа dbt Cloud.
-
Введите следующие команды в вашем терминале:
# В этом примере мы настраиваем область секретов и ключ, называемые "dbt-cloud" и "api-key" соответственно.
databricks secrets create-scope --scope <YOUR_SECRET_SCOPE>
databricks secrets put --scope <YOUR_SECRET_SCOPE> --key <YOUR_SECRET_KEY> --string-value "<YOUR_DBT_CLOUD_API_KEY>"
-
Замените
<YOUR_SECRET_SCOPE>
и<YOUR_SECRET_KEY>
на ваши уникальные идентификаторы. Нажмите здесь для получения дополнительной информации о секретах. -
Замените
<YOUR_DBT_CLOUD_API_KEY>
на фактическое значение API-ключа, который вы скопировали из dbt Cloud на шаге 1.
Создание Python-ноутбука в Databricks
-
Создайте Python-ноутбук в Databricks, который выполняет Python-скрипт, вызывающий API заданий dbt Cloud.
-
Напишите Python-скрипт, который использует библиотеку
requests
для выполнения HTTP POST-запроса к конечной точке API заданий dbt Cloud с использованием необходимых параметров. Вот пример скрипта:
import enum
import os
import time
import json
import requests
from getpass import getpass
dbutils.widgets.text("job_id", "Enter the Job ID")
job_id = dbutils.widgets.get("job_id")
account_id = <YOUR_ACCOUNT_ID>
base_url = "<YOUR_BASE_URL>"
api_key = dbutils.secrets.get(scope = "<YOUR_SECRET_SCOPE>", key = "<YOUR_SECRET_KEY>")
# Эти значения задокументированы в документации API dbt Cloud
class DbtJobRunStatus(enum.IntEnum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELLED = 30
def _trigger_job() -> int:
res = requests.post(
url=f"https://{base_url}/api/v2/accounts/{account_id}/jobs/{job_id}/run/",
headers={'Authorization': f"Token {api_key}"},
json={
# Опционально передайте описание, которое можно просмотреть в API dbt Cloud.
# См. документацию API для дополнительных параметров, которые можно передать,
# включая `schema_override`
'cause': f"Triggered by Databricks Workflows.",
}
)
try:
res.raise_for_status()
except:
print(f"API token (last four): ...{api_key[-4:]}")
raise
response_payload = res.json()
return response_payload['data']['id']
def _get_job_run_status(job_run_id):
res = requests.get(
url=f"https://{base_url}/api/v2/accounts/{account_id}/runs/{job_run_id}/",
headers={'Authorization': f"Token {api_key}"},
)
res.raise_for_status()
response_payload = res.json()
return response_payload['data']['status']
def run():
job_run_id = _trigger_job()
print(f"job_run_id = {job_run_id}")
while True:
time.sleep(5)
status = _get_job_run_status(job_run_id)
print(DbtJobRunStatus(status))
if status == DbtJobRunStatus.SUCCESS:
break
elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
raise Exception("Failure!")
if __name__ == '__main__':
run()
-
Замените
<YOUR_SECRET_SCOPE>
и<YOUR_SECRET_KEY>
на значения, которые вы использовали ранее. -
Замените
<YOUR_BASE_URL>
и<YOUR_ACCOUNT_ID>
на правильные значения вашей среды и URL доступа для вашего региона и плана.- Чтобы найти эти значения, перейдите в dbt Cloud, выберите Deploy -> Jobs. Выберите задание, которое вы хотите запустить, и скопируйте URL. Например:
https://YOUR_ACCESS_URL/deploy/000000/projects/111111/jobs/222222
и, следовательно, корректный код будет:
- Чтобы найти эти значения, перейдите в dbt Cloud, выберите Deploy -> Jobs. Выберите задание, которое вы хотите запустить, и скопируйте URL. Например:
Ваш URL структурирован как https://<YOUR_BASE_URL>/deploy/<YOUR_ACCOUNT_ID>/projects/<YOUR_PROJECT_ID>/jobs/<YOUR_JOB_ID>
account_id = 000000
job_id = 222222
base_url = "cloud.getdbt.com"
-
Запустите ноутбук. Он завершится с ошибкой, но вы должны увидеть виджет
job_id
в верхней части вашего ноутбука. -
В виджете введите ваш
job_id
из шага 4. -
Запустите ноутбук снова, чтобы активировать задание dbt Cloud. Ваши результаты должны выглядеть примерно так:
job_run_id = 123456
DbtJobRunStatus.QUEUED
DbtJobRunStatus.QUEUED
DbtJobRunStatus.QUEUED
DbtJobRunStatus.STARTING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.RUNNING
DbtJobRunStatus.SUCCESS
Вы можете отменить задание из dbt Cloud, если это необходимо.