Сценарии использования и примеры для Discovery API
С помощью Discovery API вы можете запрашивать метаданные в dbt Cloud, чтобы узнать больше о ваших развертываниях dbt и данных, которые они генерируют, для их анализа и улучшения.
Вы можете использовать API различными способами, чтобы получить ответы на ваши бизнес-вопросы. Ниже описаны некоторые из способов использования API, чтобы дать вам представление о вопросах, на которые этот API может помочь ответить.
Сценарий использования | Результат | Примеры вопросов |
---|---|---|
Производительность | Определите неэффективности в выполнении конвейера, чтобы снизить затраты на инфраструктуру и улучшить своевременность. |
|
Качество | Мониторинг свежести источников данных и результатов тестов для решения проблем и повышения доверия к данным. |
|
Обнаружение | Найдит е и поймите соответствующие наборы данных и семантические узлы с богатым контекстом и метаданными. |
|
Управление | Аудит разработки данных и содействие сотрудничеству внутри и между командами. |
|
Разработка | Понимание изменений и использования наборов данных и оценка влияния для информирования определения проекта. |
|
Производительность
Вы можете использовать Discovery API для выявления неэффективностей в выполнении конвейера, чтобы снизить затраты на инфраструктуру и улучшить своевременность. Ниже приведены примеры вопросов и запросов, которые вы можете выполнить.
Для сценариев использования, связанных с производительностью, обычно запрашивают историческое или последнее примененное состояние в любой части DAG (например, модели) с использованием конечных точек environment
, modelByEnvironment
или на уровне задания.
Сколько времени заняло выполнение каждой модели?
Полезно понять, сколько времени требуется для построения моделей (таблиц) и выполнения тестов во время выполнения dbt. Более длительное время построения моделей приводит к более высоким затратам на инфраструктуру и более позднему поступлению свежих данных к заинтересованным сторонам. Такие анализы мог ут проводиться в инструментах наблюдаемости или в ad-hoc запросах, например, в блокноте.
Пример запроса с кодом
Команды данных могут отслеживать производительность своих моделей, выявлять узкие места и оптимизировать общий конвейер данных, получая такие детали выполнения, как executionTime
и runElapsedTime
:
- Используйте API на уровне последнего состояния окружения, чтобы получить список всех выполненных моделей и их время выполнения. Затем отсортируйте модели по
executionTime
в порядке убывания.
query AppliedModels($environmentId: BigInt!, $first: Int!) {
environment(id: $environmentId) {
applied {
models(first: $first) {
edges {
node {
name
uniqueId
materializedType
executionInfo {
lastSuccessRunId
executionTime
executeStartedAt
}
}
}
}
}
}
}
- Получите последние 20 результатов выполнения для самой длительно выполняемой модели. Просмотрите результаты модели по выполненным запускам или перейдите к самому заданию/запуску или коммиту для дальнейшего исследования.
query ModelHistoricalRuns(
$environmentId: BigInt!
$uniqueId: String
$lastRunCount: Int
) {
environment(id: $environmentId) {
applied {
modelHistoricalRuns(
uniqueId: $uniqueId
lastRunCount: $lastRunCount
) {
name
runId
runElapsedTime
runGeneratedAt
executionTime
executeStartedAt
executeCompletedAt
status
}
}
}
}
- Используйте результаты запроса для построения графика исторического времени выполнения и трендов времени выполнения самой длительно выполняемой модели.
# Импорт библиотек
import os
import matplotlib.pyplot as plt
import pandas as pd
import requests
# Установите API-ключ
auth_token = *[SERVICE_TOKEN_HERE]*
# Запросите API
def query_discovery_api(auth_token, gql_query, variables):
response = requests.post('https://metadata.cloud.getdbt.com/graphql',
headers={"authorization": "Bearer "+auth_token, "content-type": "application/json"},
json={"query": gql_query, "variables": variables})
data = response.json()['data']
return data
# Получите последние метаданные выполнения для всех моделей
models_latest_metadata = query_discovery_api(auth_token, query_one, variables_query_one)['environment']
# Преобразуйте в dataframe
models_df = pd.DataFrame([x['node'] for x in models_latest_metadata['applied']['models']['edges']])
# Разверните столбец executionInfo
models_df = pd.concat([models_df.drop(['executionInfo'], axis=1), models_df['executionInfo'].apply(pd.Series)], axis=1)
# Отсортируйте модели по времени выполнения
models_df_sorted = models_df.sort_values('executionTime', ascending=False)
print(models_df_sorted)
# Получите uniqueId самой длительно выполняемой модели
longest_running_model = models_df_sorted.iloc[0]['uniqueId']
# Определите переменные второго запроса
variables_query_two = {
"environmentId": *[ENVR_ID_HERE]*
"lastRunCount": 10,
"uniqueId": longest_running_model
}
# Получите исторические метаданные выполнения для самой длительно выполняемой модели
model_historical_metadata = query_discovery_api(auth_token, query_two, variables_query_two)['environment']['applied']['modelHistoricalRuns']
# Преобразуйте в dataframe
model_df = pd.DataFrame(model_historical_metadata)
# Отфильтруйте dataframe, оставив только успешные выполнения
model_df = model_df[model_df['status'] == 'success']
# Преобразуйте столбцы runGeneratedAt, executeStartedAt и executeCompletedAt в datetime
model_df['runGeneratedAt'] = pd.to_datetime(model_df['runGeneratedAt'])
model_df['executeStartedAt'] = pd.to_datetime(model_df['executeStartedAt'])
model_df['executeCompletedAt'] = pd.to_datetime(model_df['executeCompletedAt'])
# Постройте график runElapsedTime по времени
plt.plot(model_df['runGeneratedAt'], model_df['runElapsedTime'])
plt.title('Run Elapsed Time')
plt.show()
# Постройте график executionTime по времени
plt.plot(model_df['executeStartedAt'], model_df['executionTime'])
plt.title(model_df['name'].iloc[0]+" Execution Time")
plt.show()
Примеры графиков:
Каково последнее состояние каждой модели?
Discovery API предоставляет информацию о примененном состоянии моделей и о том, как они достигли этого состояния. Вы можете получить информацию о статусе из последнего выполнения и последнего успешного выполнения (execution) из конечной точки environment
и углубиться в исторические выполнения, используя конечные точки на основе заданий и modelByEnvironment
.
Пример запроса
API возвращает полную информацию об идентификаторе (database.schema.alias
) и executionInfo
как для последнего выполнения, так и для последнего успешного выполнения из базы данных:
query ($environmentId: BigInt!, $first: Int!) {
environment(id: $environmentId) {
applied {
models(first: $first) {
edges {
node {
uniqueId
compiledCode
database
schema
alias
materializedType
executionInfo {
executeCompletedAt
lastJobDefinitionId
lastRunGeneratedAt
lastRunId
lastRunStatus
lastRunError
lastSuccessJobDefinitionId
runGeneratedAt
lastSuccessRunId
}
}
}
}
}
}
}