Перейти к основному содержимому

Духовное согласование dbt + Airflow

· 12 мин. чтения
Sung Won Chung

Airflow и dbt часто рассматриваются как взаимоисключающие:

Вы либо строите SQL-трансформации, используя SQL-операторы базы данных Airflow (например, SnowflakeOperator), либо разрабатываете их в проекте dbt.

Вы либо оркестрируете модели dbt в Airflow, либо развертываете их, используя dbt Cloud.

На мой взгляд, это ложные дихотомии, которые звучат эффектно, но на самом деле не помогают нам выполнять нашу работу как специалистам по данным.

В своей практике в качестве консультанта по данным и теперь как член команды архитектуры решений dbt Labs, я часто видел, как Airflow, dbt Core и dbt Cloud (через официальный провайдер) комбинируются по мере необходимости, в зависимости от потребностей конкретного конвейера данных или структуры и навыков команды.

Более фундаментально, я считаю важным отметить, что Airflow + dbt духовно согласованы в своей цели. Они оба существуют для облегчения четкой коммуникации между командами данных, в служении производству надежных данных.

Давайте углубимся в это духовное согласование, рассмотрим несколько случаев, когда они хорошо работают вместе, а затем погрузимся в детали, чтобы определить, какая комбинация Airflow + dbt может быть подходящей для вашей команды.

Где Airflow + dbt согласуются

Давайте рассмотрим гипотетический сценарий, с которым я сталкивался как консультант, чтобы проиллюстрировать, как Airflow + dbt работают на параллельной духовной волне.

Кратко: они оба предоставляют общие интерфейсы, которые команды данных могут использовать, чтобы быть на одной волне.

Тонкости того, когда я находил каждый из них полезным (только Airflow, Airflow с dbt Core или Cloud, только dbt Core или Cloud), заключаются в том, какие члены команды должны быть на одной волне — об этом я расскажу в следующем разделе.

Со стороны Airflow

У клиента есть 100 конвейеров данных, работающих через cron job в виртуальной машине GCP (Google Cloud Platform) каждый день в 8 утра.

Это было просто настроить, но затем начались вопросы:

  • "Куда я буду помещать логи?" В корзину Google Cloud Storage.
  • "Где я могу просмотреть историю в формате ?" Давайте экспортируем события логов в BigQuery.
  • "Мне нужно создать оповещения о логах, чтобы уведомлять людей о сбоях." Давайте используем оповещения GCP для отправки писем.
  • "Когда что-то идет не так, как перезапустить с точки сбоя?" Давайте изменим производственный скрипт.

Со временем вы начинаете строить множество компонентов, которые Airflow предоставляет из коробки.

Но что делает вас настоящим инженером данных — это тонкая настройка логирования и обеспечение работы базовой инфраструктуры вашего конвейера, или это получение надежных данных для людей, с которыми вы работаете?

Airflow решает те же проблемы, но в публично проверяемом и надежном виде — он предоставляет общий интерфейс, с помощью которого команды данных могут быть на одной волне относительно общего состояния здоровья конвейера данных. И этот общий интерфейс настраивается в коде и контролируется версией.

Со стороны dbt

Этот конвейер включал множество трансформаций данных, построенных различными способами.

Они часто писались в виде простых скриптов на Python, которые только выполняли SQL-запрос и записывали данные в BigQuery. Эти SQL-скрипты, подобные хранимым процедурам, требовали:

  • Написания шаблонного (CREATE TABLE и т.д. * 1000)
  • Управления именами схем между производственной и дев-средами
  • Ручного управления зависимостями между скриптами

Снова, это довольно легко настроить, но это не решает основную задачу: получение надежных данных для людей, которые вам важны.

Это как бы работает, но вам нужно, чтобы это действительно работало в публично наблюдаемом и проверяемом виде через тестирование.

Я никогда не встречал клиента, который писал бы скрипт для автоматической генерации DDL или писал бы шаблонные тесты для SQL — никто не хочет, чтобы это было их работой.

Так что, как и Airflow для оркестрации конвейеров, dbt делает это из коробки для слоя трансформации данных.

dbt предоставляет общий интерфейс, где команды данных могут быть на одной волне относительно бизнес-логики и статуса выполнения трансформаций данных — снова, в виде, который настраивается в коде и контролируется версией.

Если вам интересно узнать о пути миграции от рабочего процесса на основе хранимых процедур к модульному моделированию данных в dbt, ознакомьтесь с работами моих коллег Шона МакИнтайра и Пэта Кернса о миграции к ELT-конвейеру.

Заметка о ролях команд данных для Airflow + dbt

На мой взгляд, эти технические решения также сводятся к структуре команды данных, которую вы строите, и конкретно к навыкам и обучению, заложенным в эту структуру.

Инструменты дешевы по сравнению с наймом и обучением, поэтому я чаще всего видел, что решения о выборе инструментов принимаются на основе доступности персонала и поддержки обучения, а не технических характеристик или функций самих инструментов. Давайте заглянем, какие роли требуются для работы с dbt и Airflow (эти же навыки также примерно соответствуют любому другому инструменту оркестрации).

Многие из нас по-разному определяют роли, такие как инженер данных, инженер аналитики и аналитик данных.

Поэтому вместо того, чтобы зацикливаться на определении ролей, давайте сосредоточимся на практических навыках, которые я видел на практике.

требуемые навыки для airflow и dbt

Общие навыки, необходимые для реализации любого варианта dbt (Core или Cloud), включают:

YAML и Jinja можно выучить довольно быстро, но SQL — это обязательный навык, который вам понадобится для начала.

Навыки SQL обычно разделяются между специалистами по данным и инженерами, что делает трансформации на основе SQL (как в dbt) подходящим общим интерфейсом для сотрудничества.

Чтобы добавить Airflow, вам понадобятся более инженерные навыки в области программного обеспечения или инфраструктуры для построения и развертывания ваших конвейеров: Python, Docker, Bash (для использования CLI Airflow), Kubernetes, Terraform и управление секретами.

Как Airflow + dbt хорошо работают вместе

Зная, что этот набор инструментов (Airflow + dbt) удовлетворяет те же духовные потребности (публичная наблюдаемость, конфигурация как код, контроль версий и т.д.), как можно решить, когда и где их развернуть?

Это та же самая чувствительность, выраженная в точке зрения dbt в 2016 году, наиболее близкой к основополагающему блогу для dbt.

Я обычно думаю о том, как я хочу, чтобы моя работа выглядела, когда что-то идет не так — готов ли я к отладке, и понятно ли, кому передать эстафету для исправления проблемы (если это не я)?

Пара примеров:

Наблюдаемость конвейера для аналитиков

Если пользователи dbt в вашей команде — это аналитики, а не инженеры, им все равно может понадобиться возможность углубиться в коренную причину сбоя теста свежести источника dbt.

Если ваши задачи извлечения и загрузки настроены в Airflow, аналитики могут открыть интерфейс Airflow для мониторинга проблем (как они бы сделали в инструменте ETL на основе GUI), вместо того чтобы открывать тикет или беспокоить инженера в Slack. Интерфейс Airflow предоставляет общий интерфейс, который аналитики могут использовать для самостоятельного обслуживания, до момента, когда нужно предпринять действия.

панель управления airflow

Наблюдаемость трансформаций для инженеров

Когда выполнение dbt завершается сбоем в конвейере Airflow, инженер, отслеживающий общий конвейер, вероятно, не будет иметь бизнес-контекста, чтобы понять, почему отдельная модель или тест завершились сбоем — они, вероятно, не были теми, кто их создавал.

dbt предоставляет общие программные интерфейсы (API администратора и метаданных dbt Cloud и артефакты на основе .json в случае dbt Core), которые предоставляют контекст, необходимый инженеру для самостоятельного обслуживания — либо путем повторного запуска с точки сбоя, либо обращения к владельцу.

Почему я ❤️ dbt Cloud + Airflow

dbt Core — это фантастическая платформа для разработки логики трансформации данных и тестирования. Она менее фантастична как общий интерфейс для совместной работы аналитиков данных и инженеров на производственных запусках трансформационных задач.

Команда dbt Labs и команда Astronomer усердно работают над совместной разработкой некоторых опций для dbt Core и нового провайдера dbt Cloud для тех, кто использует dbt Cloud, который готов к использованию всеми пользователями OSS Airflow. Лучший выбор для вас будет зависеть от таких факторов, как доступные ресурсы вашей команды, сложность вашего случая использования и как долго может потребоваться поддержка вашей реализации.

Этот инструмент подхватывает эстафету и предоставляет общий интерфейс, где команды могут настраивать запуски и отлаживать проблемы в производственных задачах.

Если вы производите свои запуски dbt в Airflow, используя оператор dbt Core, вы сталкиваетесь с той же проблемой SQL, обернутого в Python, о которой я упоминал в начале: аналитик, который создал логику трансформации, не знает о рабочем процессе производственного запуска, что духовно является тем, чего мы пытаемся избежать здесь.

dbt Core + Airflow

Давайте рассмотрим пример из Airflow-конвейера dbt_full_refresh GitLab.

Если эта задача завершается сбоем в конвейере Airflow, существует ряд аспектов конвейера, которые нужно отладить: была ли это проблема с Kubernetes или секретами, образом Docker или самим кодом трансформации dbt?

Аналитик будет в неведении при попытке отладки этого и будет нуждаться в помощи инженера, чтобы выяснить, в чем проблема, если она связана с dbt.

Это может быть вполне приемлемо, если ваша команда данных структурирована так, чтобы инженеры данных исключительно занимались моделированием dbt, но это довольно редкий шаблон структуры организации, который я видел. И если у вас есть простые решения для этой проблемы слепоты аналитиков, я был бы рад их услышать.

После того как данные были загружены, dbt Core может быть использован для их моделирования для потребления. Большинство пользователей выбирают один из следующих вариантов: Использовать CLI dbt Core + BashOperator с Airflow (если вы выберете этот путь, вы можете использовать внешний менеджер секретов для управления учетными данными внешне), или Использовать KubernetesPodOperator для каждой задачи dbt, как это делают команды данных в таких местах, как Gitlab и Snowflake.

Оба подхода одинаково допустимы; правильный будет зависеть от команды и конкретного случая использования.

Управление зависимостямиНакладные расходыГибкостьНакладные расходы на инфраструктуру
dbt Core CLI + BashOperatorСреднийНизкийСреднийНизкий
Kubernetes Pod OperatorОчень простойСреднийВысокийСредний

Если у вас есть ресурсы DevOps и ваша команда комфортно работает с такими концепциями, как поды Kubernetes и контейнеры, вы можете использовать KubernetesPodOperator для запуска каждой задачи в образе Docker, чтобы вам никогда не приходилось думать о зависимостях Python. Более того, вы создадите библиотеку образов, содержащих ваши модели dbt, которые могут быть запущены в любой контейнеризированной среде. Однако настройка сред разработки, CI/CD и управление массивами контейнеров могут означать значительные накладные расходы для некоторых команд. Инструменты, такие как astro-cli, могут облегчить это, но в конечном итоге, не обойтись без ресурсов Kubernetes для подхода Gitlab.

Если вы только начинаете или просто не хотите иметь дело с контейнерами, использование BashOperator для вызова CLI dbt Core может быть отличным способом начать планирование ваших рабочих нагрузок dbt с помощью Airflow.

Важно отметить, что независимо от выбранного вами подхода, это всего лишь первый шаг; ваши реальные производственные потребности могут иметь больше требований. Если вам нужна гранулярность и зависимости между вашими моделями dbt, как это делает команда в Updater, вам может потребоваться деконструировать весь DAG dbt в Airflow. Если вы готовы управлять некоторыми дополнительными зависимостями, но хотите максимизировать контроль над тем, какие абстракции вы предоставляете своим конечным пользователям, вы можете использовать GoCardlessProvider, который оборачивает BashOperator и CLI dbt Core.

Повторный запуск задач с точки сбоя

До недавнего времени одним из самых больших недостатков любого из вышеупомянутых подходов была невозможность повторного запуска задачи с точки сбоя — не было простого способа сделать это. Однако с версии dbt 1.0 dbt теперь поддерживает возможность повторного запуска задач с точки сбоя, что должно значительно улучшить качество жизни.

Раньше, если вы запускали 100 моделей dbt и одна из них завершалась сбоем, это было бы обременительно. Вам пришлось бы либо перезапустить все 100, либо жестко закодировать повторный запуск сбойной модели.

Один из примеров этого — dbt run –select <manually-selected-failed-model>.

Теперь вы можете использовать следующую команду:

dbt build –select result:error+ –defer –state <previous_state_artifacts> … и это все!

Вы можете увидеть больше примеров здесь.

Это означает, что независимо от того, активно ли вы разрабатываете или просто хотите повторно запустить запланированную задачу (из-за, скажем, ошибок разрешений или тайм-аутов в вашей базе данных), теперь у вас есть единый подход для выполнения обоих.

airflow dbt run select

В контексте Airflow, вы можете использовать эту команду с TriggerRules, чтобы в случае сбоя вашей начальной модели вы могли продолжать повторный запуск с точки сбоя, не покидая интерфейс Airflow. Это может быть особенно удобно, когда причина сбоя вашей модели не связана с самим кодом модели (разрешения для определенных схем, плохие данные и т.д.)

airflow dbt run select

dbt Cloud + Airflow

Использование провайдера dbt Cloud

С новым провайдером dbt Cloud вы можете использовать Airflow для оркестрации и мониторинга ваших задач dbt Cloud без каких-либо накладных расходов dbt Core. Из коробки провайдер dbt Cloud поставляется с:

Оператором, который позволяет вам как запускать предопределенную задачу в dbt Cloud, так и загружать артефакт из задачи dbt Cloud. Хуком, который предоставляет вам безопасный способ использовать менеджер подключений Airflow для подключения к dbt Cloud. Оператор использует хук, но вы также можете использовать хук напрямую в функции Taskflow или PythonOperator, если есть пользовательская логика, которая вам нужна и не покрыта в Операторе.

Сенсором, который позволяет вам опрашивать завершение задачи. Вы можете использовать это для рабочих нагрузок, где вы хотите убедиться, что ваша задача dbt выполнена перед продолжением вашего DAG. Кратко - это сочетает в себе сквозную видимость всего (от извлечения до моделирования данных), которую вы знаете и любите в Airflow, с богатым и интуитивно понятным интерфейсом dbt Cloud.

Настройка Airflow и dbt Cloud

Чтобы настроить Airflow и dbt Cloud, вы можете следовать пошаговым инструкциям: здесь

Если ваша задача выдает ошибку или завершается сбоем в любом из вышеупомянутых случаев использования, вы можете просмотреть логи в dbt Cloud (подумайте: инженеры данных могут доверять аналитикам для решения ошибок).

Это создает гораздо более естественную передачу эстафеты и ясность в том, кто должен исправить что.

И если моя цель — доставить надежные данные, я выбираю эту простоту и ясность каждый раз.

Но здесь нет правильных или неправильных решений! Любая комбинация инструментов, которая решает проблему доставки надежных данных для вашей команды, является правильным выбором.

Comments

Loading