Перейти к основному содержимому

Использование рабочих процессов Databricks для запуска заданий dbt Cloud

Обновлен
Databricks
dbt Core
dbt Cloud
Orchestration
Intermediate
Menu

    Введение

    Использование рабочих процессов Databricks для вызова API заданий dbt Cloud может быть полезным по нескольким причинам:

    1. Интеграция с другими ETL-процессами — Если вы уже запускаете другие ETL-процессы в Databricks, вы можете использовать рабочий процесс Databricks для запуска задания dbt Cloud после завершения этих процессов.
    2. Использование функций заданий dbt Cloud — dbt Cloud предоставляет возможность мониторинга прогресса заданий, управления историческими логами и документацией, оптимизации времени выполнения моделей и многого другого.
    3. Разделение ответственности Подробные логи для заданий dbt в среде dbt Cloud могут привести к большей модульности и эффективной отладке. Это упрощает изоляцию ошибок, сохраняя возможность видеть общий статус в Databricks.
    4. Пользовательская активация заданий — Используйте рабочий процесс Databricks для запуска заданий dbt Cloud на основе пользовательских условий или логики, которые не поддерживаются нативно функцией планирования dbt Cloud. Это может дать вам больше гибкости в отношении времени и способа запуска ваших заданий dbt Cloud.

    Предварительные требования

    Настройка области секретов Databricks

    1. Получите персональный токен доступа или токен учетной записи службы из dbt Cloud.

    2. Настройте область секретов Databricks, которая используется для безопасного хранения вашего API-ключа dbt Cloud.

    3. Введите следующие команды в вашем терминале:

    # В этом примере мы настраиваем область секретов и ключ, называемые "dbt-cloud" и "api-key" соответственно.
    databricks secrets create-scope --scope <YOUR_SECRET_SCOPE>
    databricks secrets put --scope <YOUR_SECRET_SCOPE> --key <YOUR_SECRET_KEY> --string-value "<YOUR_DBT_CLOUD_API_KEY>"
    1. Замените <YOUR_SECRET_SCOPE> и <YOUR_SECRET_KEY> на ваши уникальные идентификаторы. Нажмите здесь для получения дополнительной информации о секретах.

    2. Замените <YOUR_DBT_CLOUD_API_KEY> на фактическое значение API-ключа, который вы скопировали из dbt Cloud на шаге 1.

    Создание Python-ноутбука в Databricks

    1. Создайте Python-ноутбук в Databricks, который выполняет Python-скрипт, вызывающий API заданий dbt Cloud.

    2. Напишите Python-скрипт, который использует библиотеку requests для выполнения HTTP POST-запроса к конечной точке API заданий dbt Cloud с использованием необходимых параметров. Вот пример скрипта:

    import enum
    import os
    import time
    import json
    import requests
    from getpass import getpass

    dbutils.widgets.text("job_id", "Enter the Job ID")
    job_id = dbutils.widgets.get("job_id")

    account_id = <YOUR_ACCOUNT_ID>
    base_url = "<YOUR_BASE_URL>"
    api_key = dbutils.secrets.get(scope = "<YOUR_SECRET_SCOPE>", key = "<YOUR_SECRET_KEY>")

    # Эти значения задокументированы в документации API dbt Cloud
    class DbtJobRunStatus(enum.IntEnum):
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30

    def _trigger_job() -> int:
    res = requests.post(
    url=f"https://{base_url}/api/v2/accounts/{account_id}/jobs/{job_id}/run/",
    headers={'Authorization': f"Token {api_key}"},
    json={
    # Опционально передайте описание, которое можно просмотреть в API dbt Cloud.
    # См. документацию API для дополнительных параметров, которые можно передать,
    # включая `schema_override`
    'cause': f"Triggered by Databricks Workflows.",
    }
    )

    try:
    res.raise_for_status()
    except:
    print(f"API token (last four): ...{api_key[-4:]}")
    raise

    response_payload = res.json()
    return response_payload['data']['id']

    def _get_job_run_status(job_run_id):
    res = requests.get(
    url=f"https://{base_url}/api/v2/accounts/{account_id}/runs/{job_run_id}/",
    headers={'Authorization': f"Token {api_key}"},
    )

    res.raise_for_status()
    response_payload = res.json()
    return response_payload['data']['status']

    def run():
    job_run_id = _trigger_job()
    print(f"job_run_id = {job_run_id}")
    while True:
    time.sleep(5)
    status = _get_job_run_status(job_run_id)
    print(DbtJobRunStatus(status))
    if status == DbtJobRunStatus.SUCCESS:
    break
    elif status == DbtJobRunStatus.ERROR or status == DbtJobRunStatus.CANCELLED:
    raise Exception("Failure!")

    if __name__ == '__main__':
    run()
    1. Замените <YOUR_SECRET_SCOPE> и <YOUR_SECRET_KEY> на значения, которые вы использовали ранее.

    2. Замените <YOUR_BASE_URL> и <YOUR_ACCOUNT_ID> на правильные значения вашей среды и URL доступа для вашего региона и плана.

      • Чтобы найти эти значения, перейдите в dbt Cloud, выберите Deploy -> Jobs. Выберите задание, которое вы хотите запустить, и скопируйте URL. Например: https://YOUR_ACCESS_URL/deploy/000000/projects/111111/jobs/222222 и, следовательно, корректный код будет:

    Ваш URL структурирован как https://<YOUR_BASE_URL>/deploy/<YOUR_ACCOUNT_ID>/projects/<YOUR_PROJECT_ID>/jobs/<YOUR_JOB_ID> account_id = 000000 job_id = 222222 base_url = "cloud.getdbt.com"

    1. Запустите ноутбук. Он завершится с ошибкой, но вы должны увидеть виджет job_id в верхней части вашего ноутбука.

    2. В виджете введите ваш job_id из шага 4.

    3. Запустите ноутбук снова, чтобы активировать задание dbt Cloud. Ваши результаты должны выглядеть примерно так:

    job_run_id = 123456
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.QUEUED
    DbtJobRunStatus.STARTING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.RUNNING
    DbtJobRunStatus.SUCCESS

    Вы можете отменить задание из dbt Cloud, если это необходимо.

    Настройка рабочих процессов для запуска заданий dbt Cloud

    Вы можете настроить рабочие процессы непосредственно из ноутбука ИЛИ добавив этот ноутбук в один из ваших существующих рабочих процессов:

    1. Нажмите Schedule в правой верхней части страницы
    2. Нажмите Add a schedule
    3. Настройте имя задания, расписание, кластер
    4. Добавьте новый параметр с именем: job_id и заполните ваш ID задания. Обратитесь к шагу 4 в предыдущем разделе, чтобы найти ваш ID задания.
    5. Нажмите Create
    6. Нажмите Run Now, чтобы протестировать задание

    Несколько задач Workflow могут быть настроены с использованием одного и того же ноутбука путем настройки параметра job_id для указания на разные задания dbt Cloud.

    Использование рабочих процессов Databricks для доступа к API заданий dbt Cloud может улучшить интеграцию ваших процессов обработки данных и позволить планировать более сложные рабочие процессы.

    0