Перейти к основному содержимому

Масштабирование дата-пайплайнов для финтех-компании на этапе роста с помощью инкрементальных моделей

· 15 мин. чтения
Adedamola Onabanjo
BI Manager at Kuda

Введение

Построение масштабируемых дата-пайплайнов в быстрорастущем финтехе часто похоже на починку велосипеда на ходу. Нужно постоянно поставлять инсайты, даже когда объёмы данных взрывообразно растут. В Kuda (нигерийском необанке) мы столкнулись с этой проблемой по мере стремительного роста пользовательской базы. Классический пакетный ETL (полная пересборка таблиц при каждом запуске) начал давать сбои: пайплайны выполнялись часами, а расходы резко выросли. Нам нужно было поддерживать актуальность данных, не перерабатывая всё целиком. Решением стало использование incremental models в dbt, которые обрабатывают только новые или изменённые записи. Это радикально сократило время выполнения и снизило затраты в BigQuery, позволив нам эффективно масштабироваться.

Проблемы масштабирования

Быстрый рост принёс с собой серьёзные проблемы масштабирования, и ключевыми из них были:

  • Производительность: наши ночные модели с full-refresh, которые раньше выполнялись за минуты, по мере роста данных начали занимать часы. Например, основная таблица транзакций стала слишком медленной для полной пересборки при каждом обновлении. Аналитические дашборды начали отставать, а стейкхолдеры — терять своевременные инсайты. В финтехе, близком к реальному времени, такая задержка недопустима.

  • Стоимость: рост объёма данных и времени обработки напрямую увеличивал счета BigQuery. Сканировать таблицу на 2 ТБ каждый час ради нескольких мегабайт новых данных — расточительно. При on-demand модели ценообразования BigQuery это легко может вылиться в тысячи долларов в месяц. Нам нужно было увеличивать пропускную способность, не масштабируя стоимость линейно, а значит — переосмыслить обработку и уйти от полного сканирования таблиц.

  • Целостность данных: по мере роста числа пайплайнов и зависимостей увеличивались риски несогласованностей и сбоев. Любой новый подход должен был сохранять точность и консистентность данных, даже при ускорении процессов.

Подход: Incremental models и ключевые стратегии

Мы решили эти задачи, сделав ставку на incremental models в dbt, которые обрабатывают только новые или обновлённые записи с момента последнего запуска. Вместо монолитных ежедневных пересборок наши модели стали постоянно подхватывать изменения небольшими порциями. Ниже мы описываем наши ключевые incremental strategiesappend, insert_overwrite и merge — а также то, как мы настраивали производительность и стоимость.

Стратегия Append

Это самый простой инкрементальный подход: при каждом запуске новые строки добавляются в существующую таблицу, а старые никогда не трогаются. Он идеально подходит для append-only данных (например, логов или транзакций, которые не меняются после вставки).

В dbt использование append выглядит просто. Мы настраиваем модель как incremental и указываем incremental_strategy='append' (поддерживается некоторыми адаптерами, например Snowflake).

Note: append в настоящее время не поддерживается в BigQuery. Перед выбором стратегии всегда проверяйте поддержку адаптера.

В SQL мы фильтруем источник так, чтобы выбирать только новые записи с момента последней загрузки. Например, для инкрементальной загрузки новых транзакций:

Code: Append Incremental Strategy

{{ config(
materialized = 'incremental',
incremental_strategy = 'append'
) }}
SELECT 
transaction_id,
customer_id,
transaction_date,
amount,
status
FROM {{ source('core', 'transactions') }}
{% if is_incremental() %}
WHERE transaction_date > (
SELECT MAX(transaction_date) FROM {{ this }}
)
{% endif %}

Этот запрос добавляет только те транзакции, у которых transaction_date больше максимальной даты транзакции в целевой таблице.

Append Incremental Model – Before Incremental Run

transaction_idcustomer_idtransaction_dateamountstatus
10001C0012023-09-28₦12,000completed
10002C0022023-09-28₦5,000completed
Loading table...

Append Incremental Model – After Incremental Run

transaction_idcustomer_idtransaction_dateamountstatus
10001C0012023-09-28₦12,000completed
10002C0022023-09-28₦5,000completed
10003C0032023-09-29₦8,500completed
10004C0042023-09-30₦7,250completed
10005C0052023-09-30₦3,100pending
Loading table...

Иллюстрация: таблица «Before» показывает данные до инкрементального запуска; таблица «After» — новые транзакции (выделены жирным), которые были добавлены. Исторические данные не затрагиваются. Append отлично подходит для неизменяемых потоков данных (например, журналов транзакций или событий, которые только растут).

Append хорошо подошёл нам для ingestion-пайплайнов, которые просто накапливают историю без переработки старых данных. Однако нужно было защищаться от дубликатов (если источник может повторно присылать записи, мы применяли дедупликацию или уникальные ограничения). Кроме того, чистый append не справляется с обновлениями или удалениями существующих записей. Если данные могут изменяться после вставки (например, статус транзакции меняется с "pending" на "completed"), нужна другая стратегия.

Стратегия Insert Overwrite

Для данных, разбитых на партиции по дате (или другому ключу) и требующих частичной перезаписи, идеально подходит insert_overwrite. Вместо построчного merge эта стратегия полностью перезаписывает целые партиции целевой таблицы при каждом запуске. Таблица должна быть партиционирована (по дням, часам и т.д.), и модель будет пересобирать только те партиции, в которых есть новые или обновлённые данные.

Мы использовали insert_overwrite для партиционированных данных, таких как дневные агрегаты, где изменения локализованы по дате. Например, если таблица партиционирована по transaction_date, модель с insert_overwrite может обновить только партицию за «2023-10-01», не затрагивая остальные дни.

Вот как мы настраивали модель с insert_overwrite в BigQuery:

Code: Insert Overwrite Strategy

{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = { 'field': 'transaction_date', 'data_type': 'date' }
) }}
SELECT
customer_id,
transaction_date,
amount,
transaction_type
FROM {{ source('core', 'transactions') }}
WHERE transaction_date >= {{ this.last_partition }}

Здесь partition_by задаёт партиционирование таблицы. В WHERE используется {{ this.last_partition }} (последняя партиция, уже присутствующая в целевой таблице), чтобы выбирать данные только для новых или обновлённых партиций. При каждом запуске BigQuery заменяет существующие партиции, попадающие под фильтр (например, партицию за последнюю дату), результатами запроса. Старые партиции остаются нетронутыми.

Insert Overwrite Strategy – Before Incremental Run

transaction_datetransaction_idcustomer_idamountstatus
2023-09-2911001C011₦14,000completed
2023-09-2911002C012₦6,500completed
2023-10-0112001C021₦8,000pending
2023-10-0112002C022₦4,200completed
Loading table...

(Партиция, которая будет перезаписана, выделена жирным.)

Insert Overwrite Strategy – After Incremental Run

transaction_datetransaction_idcustomer_idamountstatus
2023-09-2911001C011₦14,000completed
2023-09-2911002C012₦6,500completed
2023-10-0112003C023₦8,150completed
2023-10-0112004C024₦3,900completed
2023-10-0112005C025₦5,500completed
Loading table...

(Новые данные партиции показаны жирным — они полностью заменили старую партицию.)

Иллюстрация: «Before» показывает партиционированную таблицу с выделенной партицией за 1 октября 2023 года; «After» — эту же партицию, заменённую свежими строками. Такой подход позволяет обновлять данные за конкретный день (например, чтобы учесть поздно пришедшие транзакции или исправления), не пересобирая всю таблицу.

В Kuda insert_overwrite оказался особенно полезен для производных таблиц и агрегатов. Например, наши дневные агрегаты трат клиентов обновлялись инкрементально путём замены только данных за последний день, что обеспечивало точность при минимальной стоимости. Перезаписывая целые партиции, мы избегали сложных построчных merge, но при этом учитывали любые корректировки внутри дня (например, транзакции с задней датой).

Отдельно стоит отметить статические и динамические партиции. Мы в основном использовали статические дневные партиции (перезаписывая целые дни). Некоторые хранилища (и более новые возможности dbt) поддерживают динамическое обновление партиций (обновление только изменённых строк внутри партиции), но мы выбрали замену целого дня ради простоты. Проще сказать: «каждый запуск пересобирает партицию за вчера», гарантируя, что все поздние изменения за этот день будут учтены. Это резко улучшило производительность для больших таблиц (никаких full table scan), при этом сохранив корректность данных. Главное — правильно согласовать поле partition_by и логику фильтра, чтобы не перезаписать не ту партицию.

Стратегия Merge

Для таблиц, где поступают новые записи и при этом могут изменяться существующие, мы использовали стратегию merge. Она выполняет upsert по уникальному ключу: новые строки вставляются, а если ключ уже существует, указанные поля обновляются. Это идеально подходит для таких данных, как профили клиентов или балансы счетов, которые со временем меняются.

В dbt использование incremental_strategy='merge' требует указания unique_key (в BigQuery или Snowflake dbt компилирует это в оператор MERGE). Также можно ограничить список обновляемых колонок с помощью merge_update_columns или исключить поля через merge_exclude_columns. Например:

Code: Merge Strategy

{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'account_id',
merge_update_columns = ['balance', 'last_updated']
) }}
SELECT 
account_id,
balance,
last_updated
FROM {{ source('core', 'accounts') }}
{% if is_incremental() %}
WHERE last_updated > (
SELECT MAX(last_updated) FROM {{ this }}
)
{% endif %}

Эта модель выбирает только новые или обновлённые записи (с last_updated позже максимального значения в целевой таблице) и merge-ит их в таблицу аккаунтов по account_id. Мы обновляем только поля balance и last_updated для существующих аккаунтов, чтобы не затирать остальные данные. Если входящий account_id ещё не существует в целевой таблице, вставляется новая строка.

Merge был нашей основной стратегией для upsert-сценариев. Например, мы поддерживали ежедневно обновляемую таблицу аккаунтов со статусами и балансами клиентов именно через merge. Каждый день добавлялись новые аккаунты, а изменения (обновления баланса, смена статуса) применялись к существующим записям. Это предотвращало дубликаты (которые возникли бы при простом append) и обеспечивало одну строку на аккаунт с актуальной информацией.

Мы быстро поняли, что к выбору уникальных ключей и обновляемых колонок нужно относиться очень внимательно. В одном случае мы забыли указать merge_exclude_columns и случайно перезаписали timestamp, который хотели сохранить — хороший урок на тему явных настроек. Также merge имеет свою цену по производительности: каждый запуск джойнит новые данные с существующей таблицей. При правильном кластеринге по ключу и небольшом объёме новых данных (например, за день) это было нормально, но на очень больших масштабах требует мониторинга.

Оптимизация производительности и стоимости

Выбор правильной инкрементальной стратегии — это лишь половина дела. Мы также применили ряд техник оптимизации производительности и затрат, чтобы пайплайны действительно масштабировались:

  • Партиционирование: для больших таблиц мы использовали партиционирование по дате (или другому ключу), чтобы инкрементальные запуски сканировали только новый срез данных. Например, партиционирование таблицы транзакций по transaction_date означало, что ежедневная загрузка затрагивает только партицию за этот день. Благодаря pruning партиций в BigQuery объём сканируемых данных сокращался с нескольких терабайт до считанных гигабайт (например, с 2 ТБ до 0.01 ТБ), что давало огромную экономию. Партиционирование также ускоряло downstream-запросы с фильтрацией по дате.

  • Кластеризация: мы добавляли кластеризацию по колонкам, которые часто используются в фильтрах или джойнах (например, кластеризовали транзакции по customer_id). В BigQuery кластеризация упорядочивает данные по этим колонкам, поэтому запросы сканируют меньше данных внутри каждой партиции. Улучшения были не всегда драматичными, но ощутимыми: некоторые запросы, которые раньше сканировали десятки гигабайт, начали читать лишь малую часть.

  • Грамотное расписание: мы настраивали частоту запуска моделей, балансируя свежесть данных и стоимость. Не каждой модели нужно выполняться постоянно. Клиентские таблицы (транзакции, балансы) обновлялись каждый час для почти real-time данных, тогда как внутренние аналитические модели — раз в день или несколько раз в сутки. Корректировка расписаний позволила избежать лишних запусков и сэкономить вычислительные ресурсы. Мы также использовали запуск по зависимостям (через dbt Cloud), чтобы тяжёлые модели стартовали только после обновления upstream-данных, предотвращая бессмысленные прогоны, когда новых данных нет.

  • Настройка вычислительных ресурсов: мы оптимизировали и сам warehouse. Поскольку инкрементальные модели резко сократили объём обработки на запуск, мы смогли использовать меньшие кластеры/слоты и запускать модели чаще, не перерасходуя бюджет — большой плюс на фоне роста данных.

  • Мониторинг и алерты: мы отслеживали метрики вроде времени выполнения и количества обработанных строк, чтобы быстро замечать аномалии. Например, если ежедневная инкрементальная модель обычно добавляет сотни строк, а вдруг добавляет ноль — это тревожный сигнал (сбой upstream или отсутствие данных в источнике). Аналогично, если джоб, который обычно выполняется 5 минут, вдруг занимает 50, скорее всего произошёл непреднамеренный full scan. Мы также следили за свежестью данных: если hourly-модель не загружала новые данные 3 часа, мы разбирались. Эти проверки помогали рано выявлять проблемы (например, устаревший источник или сломанный фильтр) и обеспечивали стабильный поток данных.

Благодаря этим оптимизациям мы значительно улучшили скорость и экономичность пайплайнов. Вместо страха перед очередным скачком данных у нас появилась уверенность, что система спроектирована с запасом на рост.

Реальное внедрение: кейс Kuda

Как все эти подходы сработали на практике в Kuda в период гиперроста?

Один из критичных датасетов — поток клиентских транзакций. Изначально полный ежедневный rebuild таблицы транзакций занимал больше часа и сканировал всю историю. Мы переработали его в incremental model (стратегия append с использованием pruning партиций). Первый запуск построил исторический бэклог, а последующие подтягивали только новые транзакции. Разница была колоссальной: инкрементальный джоб стал выполняться за минуты, а объём сканируемых данных на запуск снизился более чем на 90%. Аналитики видели новые транзакции уже в течение часа, а ежемесячные расходы BigQuery по этой таблице резко упали, несмотря на продолжающийся рост данных.

Для наглядности — упрощённая сводка дневных транзакций. Изначально она содержала данные до 30 сентября 2023 года:

Daily Transactions Summary – Before

datetransactions_counttotal_amount
2023-09-281,04525,100,000
2023-09-2998022,340,000
2023-09-301,10227,500,000
Loading table...

После следующей инкрементальной загрузки (с транзакциями за 1 октября 2023 года) таблица автоматически пополнилась метриками за новый день без пересчёта предыдущих:

Daily Transactions Summary – After

datetransactions_counttotal_amount
2023-09-281,04525,100,000
2023-09-2998022,340,000
2023-09-301,10227,500,000
2023-10-011,21030,230,000
Loading table...

Таблица: пример дневной сводки транзакций. После инкрементальной загрузки за 2023-10-01 данные за новый день появились без переработки предыдущих.

Такой подход позволил нашим командам и клиентам всегда иметь актуальные данные. Служба поддержки могла просматривать почти реальные транзакции для расследования инцидентов, а клиенты — формировать актуальные выписки по счетам.

Мы также использовали incremental models для регуляторной и финансовой отчётности. Например, финансовой команде требовалась ежедневная сверка балансов и таблица аккаунтов на конец дня. Им было достаточно, чтобы данные были на день «старше», но при этом — точными и без дубликатов. Мы реализовали это через ночной incremental merge по таблице аккаунтов, подтягивая изменения из core-данных в ежедневный срез состояний счетов. В результате получался надёжный ежедневный snapshot. (Финансовая команда даже не подозревала о какой-то сложной инкрементальной логике — они просто получали отчёт каждое утро.)

Во время запуска нового карточного продукта нам понадобился почти real-time мониторинг отказов и ошибок транзакций. Наш дашборд с дневным обновлением уже не справлялся. Мы настроили инкрементальную модель, которая загружала события карточных транзакций каждые 15 минут. Чтобы не пропустить исторические исправления, мы дополнительно запланировали ночной full refresh этой модели на период запуска. Такой гибридный подход дал нам оперативную видимость и ежедневный «догоняющий» пересчёт для поздних корректировок. Это оказалось критически важно: мы рано заметили проблемы (например, всплеск отказов из-за сбоя API) и быстро их исправили, минимизировав влияние на клиентов. После стабилизации мы вернулись к чисто инкрементальным запускам.

Общий эффект для Kuda был огромным. Некоторые тяжёлые трансформации, которые были близки к падению, снова стали надёжными. Стейкхолдеры заметили, что данные в отчётах стали свежее, а удовлетворённость клиентов выросла, поскольку никто больше не видел устаревшую информацию. Контролируя затраты и поддерживая эффективность пайплайнов, мы сохранили доверие менеджмента и регуляторов.

Выводы и лучшие практики

За время масштабирования мы вынесли множество уроков о том, что работает, а что — нет. Вот ключевые выводы и best practices, которые мы бы рекомендовали любому финтеху на стадии роста, планирующему внедрение incremental models:

  • Выбирайте правильную стратегию: не всем таблицам подходит один и тот же инкрементальный подход. В общем случае используйте append для insert-only данных, insert_overwrite — для данных, партиционированных по дате (или ID), где можно заменять целые блоки, и merge — для настоящих upsert-сценариев. Если важны удаления в источнике, рассмотрите delete+insert или обработку soft delete.

  • Грамотно партиционируйте: партиционирование критически важно, но нужно выбрать правильную гранулярность. Подходящее зерно (час, день, месяц) зависит от объёма данных и паттернов запросов. Для нас дневные партиции часто были оптимальны: достаточно мелкие, чтобы уменьшать сканирование, но не настолько, чтобы создавать тысячи партиций. Всегда согласовывайте инкрементальный фильтр с полем партиционирования, чтобы включить pruning.

  • Мониторьте модели: внедряйте тесты или алерты для инкрементальных моделей. Например, проверяйте, что количество строк, добавленных за запуск, находится в ожидаемых пределах. Если ежедневная модель, которая обычно добавляет сотни строк, внезапно добавляет ноль — это тревожный сигнал. Раннее обнаружение проблем предотвращает более серьёзные сбои.

  • Периодические full refresh: со временем даже хорошо построенная инкрементальная модель может «уплыть» из-за мелких ошибок или изменений схемы. Мы планировали периодические full refresh для критичных моделей, чтобы синхронизировать их с источниками и поймать любые расхождения. Аналогично, после крупных изменений логики мы делали разовый full refresh, а затем снова возвращались к incremental.

  • Тестируйте и документируйте: мы относились к инкрементальным моделям как к критически важному коду. Писали тесты, чтобы убедиться в корректности логики (например, что после инкрементального запуска количество записей за период совпадает с источником; если нет — фильтр, вероятно, неверен). Также мы документировали предположения каждой модели (например: «эта модель работает инкрементально, не отключайте фильтр is_incremental в разработке»). Хорошая документация помогала новым членам команды не ломать инкрементальную логику.

  • Проектируйте с расчётом на рост: главный урок — планировать масштаб заранее. Теперь, проектируя новую модель или пайплайн, мы спрашиваем себя: «что будет, если данных станет в 10 раз больше?». Если full refresh в таком масштабе невозможен, мы сразу строим модель инкрементально. Это намного проще, чем рефакторинг под давлением позже. Такой подход в сочетании с гибкими возможностями incremental в dbt сделал наши пайплайны устойчивыми к будущему росту. По мере увеличения объёма данных инкрементальный подход продолжит нас выручать.

Заключение

Масштабирование финтех-платформы данных не обязательно означает пропорциональный рост затрат и времени выполнения. Используя incremental models в dbt — в сочетании с партиционированием, кластеризацией и продуманным расписанием — Kuda смогла трансформировать свои пайплайны под условия быстрого роста. Мы сохранили свежесть и точность данных для пользователей, не выходя за рамки бюджета. Инкрементальная обработка позволила справляться с постоянно растущими объёмами небольшими порциями, сохраняя гибкость по мере развития компании.

Если вы работаете в растущей компании и сталкиваетесь с медленными или дорогими джобами, попробуйте incremental models в dbt. Как показал наш опыт в Kuda, отдача может быть колоссальной: более быстрые инсайты, довольные стейкхолдеры и платформа данных, готовая к любым вызовам будущего. Будущее обработки данных (в финтехе и не только) — инкрементальное. С такими инструментами, как dbt, вы можете оседлать волну роста, а не утонуть в ней.

Comments

Loading