Перейти к основному содержимому

Использование Databricks workflows для запуска dbt jobs

Databricks
dbt Core
dbt platform
Orchestration
Intermediate
Menu

    Введение

    Использование рабочих процессов Databricks для вызова API заданий dbt может быть полезно по нескольким причинам:

    1. Интеграция с другими ETL‑процессами — Если вы уже запускаете другие ETL‑процессы в Databricks, вы можете использовать рабочий процесс Databricks для запуска задания dbt после завершения этих процессов.
    2. Использование возможностей заданий dbt dbt предоставляет возможности мониторинга выполнения заданий, управления историческими логами и документацией, оптимизации времени выполнения моделей и многого другого.
    3. Разделение ответственности — Подробные логи заданий dbt в среде dbt позволяют добиться большей модульности и более эффективной отладки. Это упрощает быструю изоляцию ошибок, при этом сохраняя возможность видеть общий статус выполнения в Databricks.
    4. Пользовательский запуск заданий — Используйте рабочий процесс Databricks для запуска заданий dbt на основе пользовательских условий или логики, которые не поддерживаются нативно механизмом планирования dbt. Это дает больше гибкости в том, когда и как запускаются ваши задания dbt.

    Предварительные требования

    • Активная учетная запись Enterprise или Enterprise+ dbt
    • У вас должна быть настроена и существующая задача деплоя dbt
    • Активная учетная запись Databricks с доступом к рабочему пространству Data Science and Engineering и к разделу Manage secrets
    • Databricks CLI
      • Примечание: Вам нужно только настроить аутентификацию. Как только вы настроите Host и Token и сможете выполнить databricks workspace ls /Users/<someone@example.com>, вы можете продолжить выполнение этого руководства.

    Настройка области секретов Databricks

    1. Получите personal access token или Service account token в dbt.

    2. Настройте Databricks secret scope, который используется для безопасного хранения вашего API‑ключа dbt.

    3. Введите следующие команды в вашем терминале:

    # В этом примере мы настраиваем область секретов и ключ, называемые "dbt-cloud" и "api-key" соответственно.
    databricks secrets create-scope --scope <YOUR_SECRET_SCOPE>
    databricks secrets put --scope <YOUR_SECRET_SCOPE> --key <YOUR_SECRET_KEY> --string-value "<YOUR_DBT_CLOUD_API_KEY>"
    1. Замените <YOUR_SECRET_SCOPE> и <YOUR_SECRET_KEY> на ваши уникальные идентификаторы. Нажмите здесь для получения дополнительной информации о секретах.

    2. Замените <YOUR_DBT_CLOUD_API_KEY> на фактическое значение API‑ключа, который вы скопировали из dbt на шаге 1.

    Создание Python-ноутбука в Databricks

    1. Создайте Python-ноутбук в Databricks, который выполняет Python-скрипт, вызывающий API заданий dbt Cloud.

    2. Создайте Databricks Python notebook, который выполняет Python‑скрипт, вызывающий API заданий dbt.

    3. Напишите Python‑скрипт, который использует библиотеку requests для выполнения HTTP POST‑запроса к endpoint API заданий dbt с использованием необходимых параметров. Ниже приведён пример такого скрипта:

    import enum
    import os
    import time
    import json
    import requests
    from getpass import getpass

    dbutils.widgets.text("job_id", "Enter the Job ID")
    job_id = dbutils.widgets.get("job_id")

    account_id = <YOUR_ACCOUNT_ID>
    base_url = "<YOUR_BASE_URL>"
    api_key = dbutils.secrets.get(scope = "<YOUR_SECRET_SCOPE>", key = "<YOUR_SECRET_KEY>")

    # Это задокументировано в документации API dbt
    class DbtJobRunStatus(enum.IntEnum):
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30

    def _trigger_job() -> int:
    res = requests.post(
    url=f"https://{base_url}/api/v2/accounts/{account_id}/jobs/{job_id}/run/",
    headers={'Authorization': f"Token {api_key}"},
    json={
    # При необходимости можно передать описание, которое будет отображаться в API <Constant name="cloud" />.
    # См. документацию по API для информации о дополнительных параметрах, которые можно передать,
    # включая `schema_override`
    'cause': f"Triggered by Databricks Workflows.",
    }
    )

    try:
    res.raise_for_status()
    except:
    print(f"API token (last four): ...{api_key[-4:]}")
    raise

    response_payload = res.json()
    return response_payload['data']['id']

    def _get_job_run_status(job_run_id):
    res = requests.get(
    url=f"https://{base_url}/api/v2/accounts/{account_id}/runs/{job_run_id}/",
    headers={'Authorization': f"Token {api_key}"},
    )

    res.raise_for_status()
    response_payload = res.json()
    return response_payload['data']['status']

    def run():
    job_run_id = _trigger_job()
    print(f"job_run_id = {job_run_id}")
    while True:
    time.sleep(5)
    status = _get_job_run_status(job_run_id)
    print(DbtJobRunStatus(status))
    if status == DbtJobRunStatus.SUCCESS:
    break
    elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
    raise Exception("Failure!")

    if __name__ == '__main__':
    run()
    1. Замените <YOUR_SECRET_SCOPE> и <YOUR_SECRET_KEY> на значения, которые вы использовали ранее.

    2. Замените <YOUR_BASE_URL> и <YOUR_ACCOUNT_ID> на корректные значения для вашего окружения и Access URL для вашего региона и тарифного плана.

      • Чтобы найти эти значения, перейдите в dbt, выберите Deploy -> Jobs. Выберите задание (Job), которое хотите запустить, и скопируйте URL. Например: https://YOUR_ACCESS_URL/deploy/000000/projects/111111/jobs/222222 — тогда корректный код будет таким:

    Ваш URL имеет вид https://<YOUR_BASE_URL>/deploy/<YOUR_ACCOUNT_ID>/projects/<YOUR_PROJECT_ID>/jobs/<YOUR_JOB_ID> account_id = 000000 job_id = 222222 base_url = "cloud.getdbt.com"

    1. Запустите ноутбук. Он завершится с ошибкой, но вы должны увидеть виджет job_id в верхней части вашего ноутбука.

    2. В виджете введите ваш job_id из шага 4.

    3. Запустите Notebook ещё раз, чтобы инициировать задание dbt. Результаты должны выглядеть примерно следующим образом:

    job_run_id = 123456
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.STARTING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.SUCCESS

    Вы можете при необходимости отменить задание в dbt.

    Настройка рабочих процессов для запуска dbt jobs

    Вы можете настроить рабочие процессы непосредственно из ноутбука ИЛИ добавив этот ноутбук в один из ваших существующих рабочих процессов:

    1. Нажмите Schedule в правой верхней части страницы
    2. Нажмите Add a schedule
    3. Настройте имя задания, расписание, кластер
    4. Добавьте новый параметр с именем: job_id и заполните ваш ID задания. Обратитесь к шагу 4 в предыдущем разделе, чтобы найти ваш ID задания.
    5. Нажмите Create
    6. Нажмите Run Now, чтобы протестировать задание

    Несколько задач Workflow можно настроить с использованием одного и того же ноутбука, если сконфигурировать параметр job_id так, чтобы он указывал на разные задания dbt.

    Использование Databricks workflows для доступа к API заданий dbt позволяет улучшить интеграцию процессов вашего конвейера данных и дает возможность планировать более сложные рабочие процессы.

    Нашли ошибку?

    0