Безсерверный стек данных с бесплатным уровнем с dlt + dbt core.
Проблема, разработчик и инструменты
Проблема: Мы с партнершей рассматриваем возможность покупки недвижимости в Португалии. Здесь нет справочных данных по рынку недвижимости — сколько домов продается, по какой цене? Никто не знает, кроме агентств недвижимости и, возможно, банков, и они неохотно делятся этой информацией. Единственный источник данных, который у нас есть, — это Idealista, портал, где агентства недвижимости размещают объявления.
К сожалению, количество объектов значительно меньше, чем количество объявлений — кажется, многие агентства недвижимости повторно размещают те же объявления, что и другие, с намеренно измененными данными и часто вводящей в заблуждение информацией. Агентства делают это, чтобы заинтересованные стороны обращались к ним за разъяснениями, и с этого момента они могут начать процесс продажи. В то же время, сайт с объявлениями заинтересован в том, чтобы это продолжалось, так как они получают оплату за каждое объявление, а не за объект.
Разработчик: Я фрилансер в области данных, который разрабатывает комплексные решения, поэтому, когда у меня возникает проблема с данными, я не могу просто оставить ее.
Инструменты: Я хочу запустить свой проект на Google Cloud Functions из-за щедрого бесплатного уровня. dlt — это новая библиотека на Python для декларативной загрузки данных, которую я давно хотел протестировать. Наконец, я буду использовать dbt Core для трансформации.
Начальная точка
Если я хочу иметь надежную информацию о состоянии рынка, мне нужно:
- Собрать неструктурированные дан ные с Idealista и сохранить их историю.
- Удалить дубликаты существующих объявлений.
- Попробовать выяснить, за сколько были проданы объекты.
Как только у меня будут объявления без дубликатов с некоторой историей, я смогу понять:
- Насколько дорогие объекты.
- Как быстро они продаются, что, надеюсь, будет сигналом о том, "стоит ли" их покупать.
К решению
Решение состоит из довольно стандартных компонентов:
- Конвейер EtL. Маленькая t означает нормализацию, такую как преобразование строк в даты или распаковка вложенных структур. Это обрабатывается функциями dlt, написанными на Python.
- Слой трансформации, который берет исходные данные, загруженные моими функциями dlt, и создает необходимые таблицы, обрабатываемые dbt.
- Из-за сложности удаления дубликатов мне пришлось добавить человеческий элемент для подтверждения удаления дубликатов в Google Sheets.
Эти элементы отражены на диаграмме ниже и более подробно разъясн ены далее в статье:
Загрузка данных
Для загрузки я использую несколько источников:
Во-первых, я загружаю объявления о домах из API Idealista, доступного через freemium-обертку API Dojo. Конвейер dlt, который я создал для загрузки, находится в этом репозитории.
После начального этапа трансформации (описанного в следующем разделе) данные без дубликатов загружаются в BigQuery, где я могу запрашивать их из клиента Google Sheets и вручную проверять удаление дубликатов.
Когда я доволен результатами, я использую готовый источник-коннектор dlt для Sheets, чтобы вернуть данные в BigQuery, как определено здесь.
Трансформация данных
Для трансформации я использую свое любимое решение, dbt Core. Для запуска и оркестрации dbt на Cloud Functions я использую dbt Core runner от dlt. Преимущество runner в этом контексте заключается в том, что я могу повторно использовать ту же настройку учетных данных, вместо создания отдельного файла profiles.yml.
Это пакет, который я создал: https://github.com/euanjohnston-dev/idealista_dbt_pipeline
Подготовка конвейера к производству
Чтобы сделать конвейер более "готовым к производству", я внес некоторые улучшения:
- Использование хранилища учетных данных вместо жесткого кодирования паролей, в данном случае Google Secret Manager.
- Получение уведомлений о запуске конвейера и его результате. Для этого я отправлял данные в Slack через декоратор dlt, который публикует ошибку в случае сбоя и метаданные в случае успеха.
from dlt.common.runtime.slack import send_slack_message
def notify_on_completion(hook):
def decorator(func):
def wrapper(*args, **kwargs):
try:
load_info = func(*args, **kwargs)
message = f"Function {func.__name__} completed successfully. Load info: {load_info}"
send_slack_message(hook, message)
return load_info
except Exception as e:
message = f"Function {func.__name__} failed. Error: {str(e)}"
send_slack_message(hook, message)
raise
return wrapper
return decorator
Результат
Результатом стала, прежде всего, визуализация, подчеркивающая уникальные объекты, доступные в моей конкретной области поиска. Карта, показанная слева на странице, дает живой обзор местоположения, количества дубликатов (размер пузыря) и цены (цвет пузыря), которые, среди прочих функций, можно фильтровать с помощью ползунков справа. Это представляет собой гораздо более упорядоченное решение, с помощью которого можно наблюдать за фактическим доступным инвентарем.
Дополнительные диаграммы подчеркивают дополнительные метрики, которые — теперь, когда удаление дубликатов завершено — могут быть точно измерены, включая, что наиболее важно, развитие со временем "средней цены/квадратный метр" и тех объектов, которые, как предполагается, были проданы.
Следующие шаги
Эта версия была в основном о создании базы, с которой можно анализировать объекты для моего личного использования.
В плане дальнейшего развития, которое могло бы произойти, у меня был интерес от людей, которые хотели бы запустить решение в своей собственной целевой области.
Для того чтобы это работало в масштабе, мне нужен более надежный метод для работы с атрибуцией дубликатов, что является сложной проблемой, так как агентства недвижимости намеренно изменяют такие детали, как количество комнат или площадь.
Возможно, это проблема, которую ML или GPT могли бы решить так же хорошо, как и человек, учитывая ограниченные доступные варианты.
Извлеченные уроки и заключение
Проблема с данными сама по себе открыла глаза на рынок недвижимости. Это запутанный рынок, полный неизвестностей и шума, что добавляет значительный риск покупки для покупателей впервые.
Что касается инструментов, было удивительно, как быстро все было настроено. dlt хорошо интегрируется с dbt и позволяет быстро и просто загружать данные, делая этот проект проще, чем я думал.
dlt
Плюсы:
- Как большой поклонник dbt, я люблю, как эти два решения дополняют друг друга. dlt автоматически обрабатывает очистку и нормализацию данных, так что я могу сосредоточиться на их курировании и моделировании в dbt. Хотя автоматическая распаковка оставляет некоторые небольшие корректировки для аналитического инженера, это гораздо лучше, чем очистка и типизация json в базе данных или в пользовательском коде на Python.
- При создании моего первого тестового конвейера я использовал duckdb. Это было отличным введением в то, насколько просто начать, и предоставило надежную отправную точку перед разработкой чего-то для облака.
Минусы:
- У меня была небольшая заминка с коннектором Google Sheets, который предполагал аутентификацию oauth вместо моего желаемого sdk, но это было относительно легко исправить (явно указав GcpServiceAccountCredentials в файле init.py для источника).
- Использование как проверенного источника в коннекторе gsheets, так и создание собственного из конечных точек Rapid API казалось одинаково интуитивным. Однако я бы хотел больше документации о том, как запускать эти 2 конвейера в одном скрипте с конвейером dbt.
dbt
Здесь без сюрпризов. Я разработал проект локально, и для развертывания в облачных функциях я внедрил учетные данные в dbt через dlt runner. Это означало, что я мог повторно использовать настройку, которую сделал для других конвейеров dlt.
def dbt_run():
# make an authenticated connection with dlt to the dwh
pipeline = dlt.pipeline(
pipeline_name='dbt_pipeline',
destination='bigquery', # credentials read from env
dataset_name='dbt'
)
# make a venv in case we have lib conflicts between dlt and current env
venv = dlt.dbt.get_venv(pipeline)
# package the pipeline, dbt package and env
dbt = dlt.dbt.package(pipeline, "dbt/property_analytics", venv=venv)
# and run it
models = dbt.run_all()
# show outcome
for m in models:
print(f"Model {m.model_name} materialized in {m.time} with status {m.status} and message {m.message}"