Перейти к основному содержимому

Миграция с DDL, DML и хранимых процедур

Обновлен
Migration
dbt Core
Beginner
Menu

    Введение

    Одна из наиболее распространенных ситуаций, с которой сталкиваются новые пользователи dbt, — это историческая кодовая база преобразований, написанная как смесь DDL и DML операторов или хранимых процедур. Переход от DML операторов к моделям dbt часто является сложной задачей для новых пользователей, поскольку этот процесс включает значительный сдвиг парадигмы от процедурного потока построения набора данных (например, серия DDL и DML операторов) к декларативному подходу к определению набора данных (например, как dbt использует SELECT операторы для выражения моделей данных). Это руководство призвано предоставить советы, хитрости и общие шаблоны для преобразования DML операторов в модели dbt.

    Подготовка к миграции

    Прежде чем углубляться в процесс преобразования, стоит отметить, что DML операторы не всегда иллюстрируют полный набор столбцов и типов столбцов, которые может содержать исходная таблица. Без знания DDL для создания таблицы невозможно точно определить, является ли ваше усилие по преобразованию точным, но вы можете приблизиться к этому.

    Если ваш поддерживает SHOW CREATE TABLE, это может быть быстрым способом получить полный набор столбцов, которые вы захотите воссоздать. Если у вас нет DDL, но вы работаете с крупной хранимой процедурой, один из подходов, который может сработать, — это извлечь списки столбцов из любых DML операторов, которые изменяют таблицу, и собрать полный набор столбцов, которые появляются.

    Что касается обеспечения правильных типов столбцов, поскольку модели, материализованные dbt, обычно используют CREATE TABLE AS SELECT или CREATE VIEW AS SELECT в качестве драйвера для создания объектов, таблицы могут оказаться с непреднамеренными типами столбцов, если запросы не являются явными. Например, если вам важно различие между INT, DECIMAL и NUMERIC, лучше всего будет быть явным. Хорошая новость заключается в том, что это легко сделать с dbt: вы просто приводите столбец к нужному типу.

    Мы также обычно рекомендуем, чтобы переименование столбцов и приведение типов происходило как можно ближе к исходным таблицам, обычно на уровне промежуточных преобразований, что помогает гарантировать, что будущие моделисты dbt будут знать, где искать эти преобразования! См. Как мы структурируем наши проекты dbt для получения дополнительной информации о структуре проекта в целом.

    Операции, которые нам нужно сопоставить

    Существует четыре основных DML оператора, которые вам, вероятно, придется преобразовать в операции dbt при миграции процедуры:

    • INSERT
    • UPDATE
    • DELETE
    • MERGE

    Каждый из них можно обработать с помощью различных техник в dbt. Обработка MERGE немного сложнее, чем остальных, но может быть эффективно выполнена с помощью dbt. Первые три, однако, довольно просты для преобразования.

    Сопоставление INSERT

    Оператор INSERT функционально аналогичен использованию dbt для SELECT из существующего источника или другой модели dbt. Если вы столкнулись с оператором INSERT-SELECT, самый простой способ преобразовать оператор — это просто создать новую модель dbt и извлечь часть SELECT из оператора INSERT из процедуры и поместить ее в модель. Вот и все!

    Чтобы действительно разобрать это, давайте рассмотрим простой пример:

    INSERT INTO returned_orders (order_id, order_date, total_return)

    SELECT order_id, order_date, total FROM orders WHERE type = 'return'

    Преобразование этого с первого раза в модель dbt (в файле, называемом returned_orders.sql) может выглядеть примерно так:

    SELECT
    order_id as order_id,
    order_date as order_date,
    total as total_return

    FROM {{ ref('orders') }}

    WHERE type = 'return'

    Функционально это создаст модель (которая может быть материализована как таблица или представление в зависимости от потребностей), называемую returned_orders, которая содержит три столбца: order_id, order_date, total_return, основанные на столбце type. Это достигает той же цели, что и INSERT, только в декларативной форме, используя dbt.

    Заметка о FROM

    В dbt использование жестко закодированного имени таблицы или представления в FROM является одной из самых серьезных ошибок, которые делают новые пользователи. dbt использует макросы ref и source для определения порядка, в котором должны выполняться преобразования, и если вы их не используете, вы не сможете воспользоваться встроенной генерацией родословной данных и выполнением конвейера в dbt. В примерах кода на протяжении оставшейся части этой статьи мы будем использовать операторы ref в dbt-преобразованных версиях SQL операторов, но это упражнение для читателя — убедиться, что эти модели существуют в их проектах dbt.

    Последовательные INSERT в существующую таблицу можно объединить с помощью UNION ALL

    Поскольку модели dbt фактически выполняют один CREATE TABLE AS SELECT (или, если разбить это на шаги, CREATE, затем INSERT), вы можете столкнуться с сложностями, если в вашем преобразовании есть несколько операторов INSERT, которые все вставляют данные в одну и ту же таблицу. К счастью, это простая задача для обработки в dbt. Фактически, логика выполняет UNION ALL между запросами INSERT. Если у меня есть поток преобразования, который выглядит примерно так (игнорируйте надуманность сценария):

    CREATE TABLE all_customers

    INSERT INTO all_customers SELECT * FROM us_customers

    INSERT INTO all_customers SELECT * FROM eu_customers

    Версия dbt этого будет выглядеть примерно так:

    SELECT * FROM {{ ref('us_customers') }}

    UNION ALL

    SELECT * FROM {{ ref('eu_customers') }}

    Логика функционально эквивалентна. Таким образом, если есть еще один оператор, который INSERT в модель, которую я уже создал, я могу просто добавить эту логику во второй оператор SELECT, который просто объединен с первым с помощью UNION ALL. Легко!

    Сопоставление UPDATE

    UPDATE начинают увеличивать сложность ваших преобразований, но, к счастью, они также довольно просты для миграции. Процесс мышления, который вы проходите при переводе UPDATE, очень похож на то, как работает INSERT, но логика для списка SELECT в модели dbt в основном исходит из содержимого секции SET оператора UPDATE. Давайте рассмотрим простой пример:

    UPDATE orders

    SET type = 'return'

    WHERE total < 0

    Способ взглянуть на это аналогичен оператору INSERT-SELECT. Таблица, которая обновляется, — это модель, которую вы хотите изменить, и поскольку это UPDATE, эта модель, вероятно, уже была создана, и вы можете либо:

    • добавить к ней с последующими преобразованиями
    • создать промежуточную модель, которая строится на основе оригинальной модели — возможно, назвав ее что-то вроде int_[entity]_[verb].sql.

    Список SELECT должен содержать все столбцы для таблицы, но для конкретных столбцов, которые обновляются DML, вы будете использовать вычисление на правой стороне знака равенства в качестве значения SELECT. Затем вы можете использовать целевое имя столбца на левой стороне знака равенства в качестве псевдонима столбца.

    Если бы я создавал промежуточное преобразование из приведенного выше запроса, это выглядело бы примерно так:

    SELECT
    CASE
    WHEN total < 0 THEN 'return'
    ELSE type
    END AS type,

    order_id,
    order_date

    FROM {{ ref('stg_orders') }}

    Поскольку оператор UPDATE не изменяет каждое значение столбца type, мы используем оператор CASE, чтобы применить содержимое условия WHERE. Мы все еще хотим выбрать все столбцы, которые должны оказаться в целевой таблице. Если мы оставим один из столбцов, он вообще не будет передан в целевую таблицу из-за декларативного подхода dbt.

    Иногда вы можете не знать, какие все столбцы находятся в таблице, или в ситуации, как выше, вы изменяете только небольшое количество столбцов относительно общего количества столбцов в таблице. Может быть обременительно перечислять каждый столбец в таблице, но, к счастью, dbt содержит некоторые полезные утилиты макросов, которые могут помочь перечислить полный список столбцов таблицы.

    Другой способ, которым я мог бы написать модель немного более динамично, может быть:

    SELECT
    {{ dbt_utils.star(from=ref('stg_orders'), except=['type']) }},
    CASE
    WHEN total < 0 THEN 'return'
    ELSE type
    END AS type,

    FROM {{ ref('stg_orders') }}

    Макрос dbt_utils.star() выведет полный список столбцов в таблице, но пропустит те, которые я перечислил в списке исключений, что позволяет мне выполнять ту же логику, написав меньше строк кода. Это простой пример использования макросов dbt для упрощения и сокращения вашего кода, и dbt может стать гораздо более сложным, когда вы изучите больше техник. Прочитайте больше о пакете dbt_utils и макросе star.

    Сопоставление DELETE

    Одно из самых больших различий между процедурным преобразованием и тем, как dbt моделирует данные, заключается в том, что dbt, в общем, никогда не уничтожает данные. Хотя существуют способы выполнения жестких DELETE в dbt, которые выходят за рамки этой статьи, общая лучшая практика для обработки удаленных данных — просто использовать мягкие удаления и фильтровать мягко удаленные данные в конечном преобразовании.

    Рассмотрим простой пример запроса:

    DELETE FROM stg_orders WHERE order_status IS NULL

    В модели dbt вам нужно сначала определить записи, которые должны быть удалены, а затем отфильтровать их. Существует действительно два основных способа, которыми вы можете перевести этот запрос:

    SELECT * FROM {{ ref('stg_orders') }} WHERE order_status IS NOT NULL

    Этот первый подход просто инвертирует логику DELETE, чтобы описать набор записей, которые должны остаться, вместо набора записей, которые должны быть удалены. Это связано с тем, как dbt декларативно описывает наборы данных. Вы ссылаетесь на данные, которые должны быть в наборе данных, и таблица или представление создается с этим набором данных.

    Другой способ, которым вы могли бы достичь этого, — это пометить удаленные записи, а затем отфильтровать их. Например:

    WITH

    soft_deletes AS (

    SELECT
    *,
    CASE
    WHEN order_status IS NULL THEN true
    ELSE false
    END AS to_delete

    FROM {{ ref('stg_orders') }}

    )

    SELECT * FROM soft_deletes WHERE to_delete = false

    Этот подход помечает все удаленные записи, и финальный SELECT фильтрует любые удаленные данные, так что результирующая таблица содержит только оставшиеся записи. Это гораздо более многословно, чем просто инвертировать логику DELETE, но для сложной логики DELETE это оказывается очень эффективным способом выполнения DELETE, который сохраняет исторический контекст.

    Стоит отметить, что хотя это не позволяет выполнить жесткое удаление, жесткие удаления могут быть выполнены несколькими способами, наиболее распространенным из которых является выполнение dbt макросов через run-operation, или с помощью post-hook для выполнения оператора DELETE после того, как записи, которые должны быть удалены, были помечены. Это продвинутые подходы, выходящие за рамки этого руководства.

    Сопоставление MERGE

    dbt имеет концепцию, называемую материализацией, которая определяет, как модель физически или логически представлена в хранилище. INSERT, UPDATE и DELETE обычно выполняются с использованием таблицы или представления материализаций. Для инкрементных рабочих нагрузок, выполняемых с помощью команд, таких как MERGE или UPSERT, dbt имеет особую материализацию, называемую инкрементальной. Инкрементальная материализация специально используется для обработки инкрементных загрузок и обновлений таблицы без воссоздания всей таблицы с нуля при каждом запуске.

    Шаг 1: Сопоставьте MERGE как INSERT/UPDATE для начала

    Прежде чем мы углубимся в точные детали того, как реализовать инкрементальную материализацию, давайте поговорим о логическом преобразовании. Извлечение логики MERGE и обработка ее так же, как INSERT или UPDATE, — это самый простой способ начать миграцию команды MERGE.

    Чтобы увидеть, как работает логическое преобразование, начнем с примера MERGE. В этом сценарии представьте приложение для совместного использования поездок, где поездки загружаются в таблицу деталей ежедневно, а чаевые могут быть обновлены в более поздний срок и должны быть актуальными:

    MERGE INTO ride_details USING (
    SELECT
    ride_id,
    subtotal,
    tip

    FROM rides_to_load AS rtl

    ON ride_details.ride_id = rtl.ride_id

    WHEN MATCHED THEN UPDATE

    SET ride_details.tip = rtl.tip

    WHEN NOT MATCHED THEN INSERT (ride_id, subtotal, tip)
    VALUES (rtl.ride_id, rtl.subtotal, NVL(rtl.tip, 0, rtl.tip)
    );

    Содержимое оператора USING — это полезный фрагмент кода, потому что его можно легко поместить в CTE в качестве отправной точки для обработки оператора совпадения. Я считаю, что самый простой способ разбить это — это рассматривать каждый оператор совпадения как отдельный CTE, который строится на предыдущих операторах совпадения.

    Мы можем игнорировать оператор ON на данный момент, так как он будет играть роль только тогда, когда мы будем готовы превратить это в инкрементальную материализацию.

    Как и в случае с UPDATE и INSERT, вы можете использовать список SELECT и псевдонимы для именования столбцов соответствующим образом для целевой таблицы и объединять INSERT операторы (принимая во внимание использование UNION, а не UNION ALL, чтобы избежать дубликатов).

    MERGE будет переведен примерно так:

    WITH

    using_clause AS (

    SELECT
    ride_id,
    subtotal,
    tip

    FROM {{ ref('rides_to_load') }}

    ),

    updates AS (

    SELECT
    ride_id,
    subtotal,
    tip

    FROM using_clause

    ),

    inserts AS (

    SELECT
    ride_id,
    subtotal,
    NVL(tip, 0, tip)

    FROM using_clause

    )

    SELECT *

    FROM updates

    UNION inserts

    Чтобы было ясно, это преобразование не завершено. Логика здесь похожа на MERGE, но на самом деле не будет делать то же самое, так как CTEs обновлений и вставок оба выбирают из одного и того же исходного запроса. Нам нужно будет убедиться, что мы захватываем отдельные наборы данных, когда мы переходим к инкрементальной материализации.

    Одно важное предостережение заключается в том, что dbt не поддерживает нативно DELETE как действие MATCH. Если у вас есть строка в вашем операторе MERGE, которая использует WHEN MATCHED THEN DELETE, вы захотите рассматривать это как обновление и добавить флаг мягкого удаления, который затем фильтруется в последующем преобразовании.

    Шаг 2: Преобразование в инкрементальную материализацию

    Как упоминалось выше, инкрементальные материализации немного особенные, поскольку, когда целевая таблица не существует, материализация функционирует почти так же, как стандартная материализация таблицы, и выполняет оператор CREATE TABLE AS SELECT. Если целевая таблица существует, однако, материализация вместо этого выполняет оператор MERGE.

    Поскольку MERGE требует условия JOIN между оператором USING и целевой таблицей, нам нужен способ указать, как dbt определяет, вызывает ли запись совпадение или нет. Эта конкретная информация указывается в конфигурации модели dbt.

    Мы можем добавить следующий блок config() в начало нашей модели, чтобы указать, как она должна строиться инкрементально:

    {{
    config(
    materialized='incremental',
    unique_key='ride_id',
    incremental_strategy='merge'
    )
    }}

    Три поля конфигурации в этом примере являются наиболее важными.

    • Установка materialized='incremental' сообщает dbt применять логику UPSERT к целевой таблице.
    • unique_key должен быть первичным ключом целевой таблицы. Это используется для сопоставления записей с существующей таблицей.
    • incremental_strategy здесь установлен на MERGE, чтобы объединить любые существующие строки в целевой таблице со значением для unique_key, которое совпадает с входящей партией данных. Существуют различные инкрементальные стратегии для различных ситуаций и хранилищ.

    Основная часть работы по преобразованию модели в инкрементальную материализацию заключается в определении того, как логика должна изменяться для инкрементных загрузок по сравнению с полными загрузками или начальными загрузками. dbt предлагает специальный макрос, is_incremental(), который оценивается как ложный для начальных загрузок или для полных загрузок (называемых полными обновлениями в терминологии dbt), но истинный для инкрементных загрузок.

    Этот макрос можно использовать для дополнения кода модели, чтобы настроить, как данные загружаются для последующих загрузок. Как эта логика должна быть добавлена, будет немного зависеть от того, как данные получены. Некоторые общие способы могут быть:

    1. Исходная таблица очищается перед инкрементными загрузками и содержит только данные, которые должны быть загружены в этом инкременте.
    2. Исходная таблица содержит все исторические данные, и есть столбец временной метки загрузки, который идентифицирует новые данные, которые должны быть загружены.

    В первом случае работа фактически уже выполнена. Поскольку исходная таблица всегда содержит только новые данные, которые должны быть загружены, запрос не должен изменяться для инкрементных загрузок. Однако второй случай требует использования макроса is_incremental(), чтобы правильно обработать логику.

    Взяв преобразованный оператор MERGE, который мы собрали ранее, мы бы дополнили его, чтобы добавить эту дополнительную логику:

    WITH

    using_clause AS (

    SELECT
    ride_id,
    subtotal,
    tip,
    max(load_timestamp) as load_timestamp

    FROM {{ ref('rides_to_load') }}


    {% if is_incremental() %}

    WHERE load_timestamp > (SELECT max(load_timestamp) FROM {{ this }})

    {% endif %}

    ),

    updates AS (

    SELECT
    ride_id,
    subtotal,
    tip,
    load_timestamp

    FROM using_clause

    {% if is_incremental() %}

    WHERE ride_id IN (SELECT ride_id FROM {{ this }})

    {% endif %}

    ),

    inserts AS (

    SELECT
    ride_id,
    subtotal,
    NVL(tip, 0, tip),
    load_timestamp

    FROM using_clause

    WHERE ride_id NOT IN (SELECT ride_id FROM updates)

    )

    SELECT * FROM updates UNION inserts

    Здесь есть несколько важных концепций, которые нужно понять:

    1. Код в условном блоке is_incremental() выполняется только для инкрементных выполнений этого кода модели. Если целевая таблица не существует или если используется опция --full-refresh, этот код не будет выполняться.
    2. {{ this }} — это специальное ключевое слово в dbt, которое при использовании в блоке Jinja самоссылается на модель, для которой выполняется код. Таким образом, если у вас есть модель в файле, называемом my_incremental_model.sql, {{ this }} будет ссылаться на my_incremental_model (полностью квалифицированное с именем базы данных и схемы, если необходимо). Используя это ключевое слово, мы можем использовать текущее состояние целевой таблицы, чтобы информировать исходный запрос.

    Миграция хранимых процедур

    Техники, описанные выше, являются полезными способами начать преобразование отдельных DML операторов, которые часто встречаются в хранимых процедурах. Используя такие шаблоны, устаревший процедурный код можно быстро преобразовать в модели dbt, которые гораздо более читаемы, поддерживаемы и соответствуют лучшим практикам программной инженерии, таким как принципы DRY. Кроме того, как только преобразования переписаны как модели dbt, становится гораздо проще тестировать преобразования, чтобы гарантировать, что данные, используемые в дальнейшем, являются качественными и надежными.

    0