Перейти к основному содержимому

Конфигурации Amazon Athena

Модели

Конфигурация таблицы

ПараметрПо умолчаниюОписание
external_locationNoneПолный путь S3, где сохраняется таблица. Работает только с инкрементальными моделями. Не работает с таблицами Hive, если ha установлено в true.
partitioned_byNoneСписок столбцов, по которым будет разбита таблица. В настоящее время ограничено 100 разделами.
bucketed_byNoneСписок столбцов для распределения данных по корзинам. Игнорируется при использовании Iceberg.
bucket_countNoneКоличество корзин для распределения данных. Этот параметр игнорируется при использовании Iceberg.
table_typeHiveТип таблицы. Поддерживает hive или iceberg.
haFalseСоздание таблицы с использованием метода высокой доступности. Доступно только для таблиц Hive.
formatParquetФормат данных для таблицы. Поддерживает ORC, PARQUET, AVRO, JSON и TEXTFILE.
write_compressionNoneТип сжатия для любого формата хранения, который поддерживает сжатие.
field_delimeterNoneУкажите пользовательский разделитель полей, который будет использоваться, когда формат установлен в TEXTFILE.
table_propertiesN/AСвойства таблицы, которые нужно добавить к таблице. Это только для Iceberg.
native_dropN/AОперации удаления отношений будут выполняться с помощью SQL, а не прямых вызовов API Glue. Вызовы S3 не будут выполняться для управления данными в S3. Данные в S3 будут очищены только для таблиц Iceberg. Подробнее см. в документации AWS. Операции DROP TABLE в Iceberg могут завершиться по тайм-ауту, если они длятся более 60 секунд.
seed_by_insertFalseСоздает начальные данные с использованием SQL-оператора вставки. Большие файлы начальных данных не могут превышать лимит Athena в 262144 байт.
force_batchFalseЗапуск создания таблицы напрямую в режиме пакетной вставки. Полезно, когда стандартное создание таблицы не удается из-за ограничения на разделы.
unique_tmp_table_suffixFalseЗаменяет суффикс "__dbt_tmp table" на уникальный UUID для инкрементальных моделей, использующих вставку с перезаписью в таблицах Hive.
temp_schemaNoneОпределяет схему для хранения временных операторов создания, используемых в инкрементальных запусках моделей. Схема будет создана в целевой базе данных моделей, если она не существует.
lf_tags_configNoneТеги AWS Lake Formation, которые нужно связать с таблицей и столбцами. Существующие теги будут удалены.
* enabled (default=False) указывает, включено ли управление тегами LF для модели
* tags словарь с тегами и их значениями для назначения модели
* tags_columns словарь с ключом тега, значением и списком столбцов, к которым они должны быть назначены
lf_inherited_tagsNoneСписок ключей тегов Lake Formation, которые должны быть унаследованы на уровне базы данных и не должны удаляться при назначении тех, что определены в ls_tags_config.
lf_grantsNoneКонфигурация грантов Lake Formation для фильтров data_cell.

Примеры конфигурации

models/schema.yml
{{
config(
materialized='incremental',
incremental_strategy='append',
on_schema_change='append_new_columns',
table_type='iceberg',
schema='test_schema',
lf_tags_config={
'enabled': true,
'tags': {
'tag1': 'value1',
'tag2': 'value2'
},
'tags_columns': {
'tag1': {
'value1': ['column1', 'column2'],
'value2': ['column3', 'column4']
}
},
'inherited_tags': ['tag1', 'tag2']
}
)
}}

Существуют некоторые ограничения и рекомендации, которые следует учитывать:

  • Конфигурации lf_tags и lf_tags_columns поддерживают только прикрепление тегов LF к соответствующим ресурсам.
  • Мы рекомендуем управлять разрешениями LF Tags вне dbt. Например, с помощью terraform или aws cdk.
  • Управление data_cell_filters не может быть автоматизировано вне dbt, потому что фильтр не может быть прикреплен к таблице, которая не существует. Как только вы enable эту конфигурацию, dbt установит все фильтры и их разрешения во время каждого запуска dbt. Такой подход сохраняет актуальное состояние конфигурации безопасности на уровне строк после каждого запуска dbt и применяет изменения, если они происходят: удаление, создание и обновление фильтров и их разрешений.
  • Любые теги, перечисленные в lf_inherited_tags, должны строго наследоваться на уровне базы данных и никогда не переопределяться на уровне таблицы и столбца.
  • В настоящее время dbt-athena не различает ассоциацию унаследованного тега и переопределение, сделанное ранее.
    • Например, если значение lf_tags_config переопределяет унаследованный тег в одном запуске, и это переопределение удаляется перед последующим запуском, предыдущее переопределение останется и больше не будет закодировано нигде (например, в Terraform, где унаследованное значение настроено, или в проекте DBT, где переопределение ранее существовало, но теперь исчезло).

Расположение таблицы

Сохраненное расположение таблицы определяется в порядке приоритета следующими условиями:

  1. Если external_location определено, используется это значение.
  2. Если s3_data_dir определено, путь определяется этим значением и s3_data_naming.
  3. Если s3_data_dir не определено, данные хранятся в s3_staging_dir/tables/.

Доступны следующие опции для s3_data_naming:

  • unique: {s3_data_dir}/{uuid4()}/
  • table: {s3_data_dir}/{table}/
  • table_unique: {s3_data_dir}/{table}/{uuid4()}/
  • schema_table: {s3_data_dir}/{schema}/{table}/
  • s3_data_naming=schema_table_unique: {s3_data_dir}/{schema}/{table}/{uuid4()}/

Чтобы установить s3_data_naming глобально в целевом профиле, перезапишите значение в конфигурации таблицы или установите значение для групп моделей в dbt_project.yml.

Примечание: Если вы используете рабочую группу с настроенным местоположением по умолчанию, s3_data_naming игнорирует любые настроенные корзины и использует местоположение, настроенное в рабочей группе.

Инкрементальные модели

Поддерживаются следующие стратегии инкрементальных моделей:

  • insert_overwrite (по умолчанию): Стратегия вставки с перезаписью удаляет пересекающиеся разделы из целевой таблицы, а затем вставляет новые записи из источника. Эта стратегия зависит от ключевого слова partitioned_by! dbt вернется к стратегии append, если разделы не определены.
  • append: Вставка новых записей без обновления, удаления или перезаписи существующих данных. Может быть дублирование данных (отлично подходит для логов или исторических данных).
  • merge: Условное обновление, удаление или вставка строк в таблицу Iceberg. Используется в сочетании с unique_key. Доступно только при использовании Iceberg.

При изменении схемы

Опция on_schema_change отражает изменения схемы в инкрементальных моделях. Значения, которые можно установить:

  • ignore (по умолчанию)
  • fail
  • append_new_columns
  • sync_all_columns

Чтобы узнать больше, обратитесь к Что делать, если столбцы моей инкрементальной модели изменяются.

Iceberg

Адаптер поддерживает материализацию таблиц для Iceberg.

Например:

{{ config(
materialized='table',
table_type='iceberg',
format='parquet',
partitioned_by=['bucket(user_id, 5)'],
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
}
) }}

select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date

Iceberg поддерживает распределение по корзинам как скрытые разделы. Используйте конфигурацию partitioned_by, чтобы добавить конкретные условия распределения по корзинам.

Iceberg поддерживает форматы таблиц PARQUET, AVRO и ORC для данных.

Поддерживаются следующие стратегии для использования Iceberg инкрементально:

  • append: Новые записи добавляются в таблицу (это может привести к дублированию).
  • merge: Выполняет обновление и вставку (и, возможно, удаление), где добавляются новые и существующие записи. Это доступно только с версией движка Athena 3.
    • unique_key(обязательно): Столбцы, которые определяют уникальную запись в исходной и целевой таблице.
    • incremental_predicates (необязательно): SQL-условия, которые позволяют настраивать соединительные условия в операторе слияния. Это помогает улучшить производительность за счет проталкивания предикатов в целевые таблицы.
    • delete_condition (необязательно): SQL-условие, которое определяет записи, которые должны быть удалены.
    • update_condition (необязательно): SQL-условие, которое определяет записи, которые должны быть обновлены.
    • insert_condition (необязательно): SQL-условие, которое определяет записи, которые должны быть вставлены.

incremental_predicates, delete_condition, update_condition и insert_condition могут включать любой столбец инкрементальной таблицы (src) или конечной таблицы (target). Имена столбцов должны иметь префикс src или target, чтобы избежать ошибки Column is ambiguous.

{{ config(
materialized='incremental',
table_type='iceberg',
incremental_strategy='merge',
unique_key='user_id',
incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
format='parquet'
) }}

select 'A' as user_id,
'pi' as name,
'active' as status,
17.89 as cost,
1 as quantity,
100000000 as quantity_big,
current_date as my_date

Таблица высокой доступности (HA)

Текущая реализация материализации таблицы может привести к простоям, так как целевая таблица удаляется и создается заново. Для менее разрушительного поведения вы можете использовать конфигурацию ha в своих моделях с материализацией table. Она использует функцию версий таблиц каталога Glue, которая создает временную таблицу и меняет местами целевую таблицу с местоположением временной таблицы. Эта материализация доступна только для table_type=hive и требует использования уникальных местоположений. Для Iceberg высокая доступность является настройкой по умолчанию.

По умолчанию материализация сохраняет последние 4 версии таблицы, но вы можете изменить это, установив versions_to_keep.

{{ config(
materialized='table',
ha=true,
format='parquet',
table_type='hive',
partitioned_by=['status'],
s3_data_naming='table_unique'
) }}

select 'a' as user_id,
'pi' as user_name,
'active' as status
union all
select 'b' as user_id,
'sh' as user_name,
'disabled' as status

Известные проблемы HA

  • Может быть небольшой простой при смене таблицы с разделами на таблицу без них (и наоборот). Если требуется более высокая производительность, рассмотрите возможность использования распределения по корзинам вместо разделов.
  • По умолчанию Glue "дублирует" версии внутренне, поэтому последние две версии таблицы указывают на одно и то же местоположение.
  • Рекомендуется установить versions_to_keep >= 4, так как это предотвратит удаление старого местоположения.

Обновление каталога данных Glue

Вы можете сохранить описания на уровне столбцов и моделей в каталоге данных Glue как свойства таблицы Glue и параметры столбцов. Чтобы включить это, установите конфигурацию в true, как показано в следующем примере. По умолчанию сохранение документации отключено, но его можно включить для конкретных ресурсов или групп ресурсов по мере необходимости.

Например:

models:
- name: test_deduplicate
description: another value
config:
persist_docs:
relation: true
columns: true
meta:
test: value
columns:
- name: id
meta:
primary_key: true

Обратитесь к persist_docs для получения более подробной информации.

Снимки

Адаптер поддерживает материализацию снимков. Он поддерживает как стратегию временных меток, так и стратегию проверки. Чтобы создать снимок, создайте файл снимка в каталоге snapshots. Вам нужно будет создать этот каталог, если он еще не существует.

Стратегия временных меток

Обратитесь к Стратегия временных меток для получения подробной информации о том, как ее использовать.

Стратегия проверки

Обратитесь к Стратегия проверки для получения подробной информации о том, как ее использовать.

Жесткие удаления

Материализация также поддерживает аннулирование жестких удалений. Для получения информации о использовании обратитесь к Жесткие удаления.

Известные проблемы со снимками

  • Инкрементальные модели Iceberg - Синхронизация всех столбцов при изменении схемы. Столбцы, используемые для разделения, не могут быть удалены. С точки зрения dbt, единственный способ - это полное обновление инкрементальной модели.
  • Имена таблиц, схем и баз данных должны быть только в нижнем регистре.
  • Чтобы избежать потенциальных конфликтов, убедитесь, что dbt-athena-adapter не установлен в целевой среде.
  • Снимок не поддерживает удаление столбцов из исходной таблицы. Если вы удаляете столбец, убедитесь, что вы также удалили столбец из снимка. Другой обходной путь - это установить значение NULL для столбца в определении снимка, чтобы сохранить историю.

Интеграция AWS Lake Formation

Ниже описано, как адаптер реализует управление тегами AWS Lake Formation:

  • Включите управление тегами LF с помощью параметра lf_tags_config. По умолчанию оно отключено.
  • После включения теги LF обновляются при каждом запуске dbt.
  • Сначала все теги LF для столбцов удаляются, чтобы избежать проблем с наследованием.
  • Затем все избыточные теги LF удаляются из таблиц, и актуальные теги из конфигураций таблиц применяются.
  • Наконец, теги LF для столбцов применяются.

Важно понимать следующие моменты:

  • dbt не управляет lf-tags для баз данных
  • dbt не управляет разрешениями Lake Formation

Поэтому важно позаботиться об этом самостоятельно или использовать инструмент автоматизации, такой как terraform и AWS CDK. Для получения более подробной информации обратитесь к:

Модели на Python

Адаптер поддерживает модели на Python с использованием spark.

Предварительные требования

  • Рабочая группа с поддержкой Spark, созданная в Athena.
  • Роль выполнения Spark, предоставляющая доступ к Athena, Glue и S3.
  • Рабочая группа Spark добавлена в файл ~/.dbt/profiles.yml, и профиль, который будет использоваться, указан в dbt_project.yml.

Конфигурация таблицы, специфичная для Spark

КонфигурацияПо умолчаниюОписание
timeout43200Тайм-аут в секундах для каждого выполнения модели на Python. По умолчанию 12 часов/43200 секунд.
spark_encryptionFalseПри установке в true шифрует данные, хранящиеся локально Spark, и в транзите между узлами Spark.
spark_cross_account_catalogFalseПри использовании рабочей группы Spark Athena запросы по умолчанию могут выполняться только в каталогах на том же аккаунте AWS. Установка этого параметра в true позволит выполнять запросы к внешним каталогам, если вы хотите выполнить запрос к другому каталогу на внешнем аккаунте AWS.
Используйте синтаксис external_catalog_id/database.table, чтобы получить доступ к внешней таблице во внешнем каталоге (например, 999999999999/mydatabase.cloudfront_logs, где 999999999999 - это идентификатор внешнего каталога).
spark_requester_paysFalseПри установке в true, если корзина Amazon S3 настроена как requester pays, учетная запись пользователя, выполняющего запрос, оплачивает доступ к данным и сборы за передачу данных, связанные с запросом.

Заметки по Spark

  • Сессия создается для каждой уникальной конфигурации движка, определенной в моделях, которые являются частью вызова. Тайм-аут бездействия сессии установлен на 10 минут. В течение периода тайм-аута, если новый расчет (модель на Python для Spark) готов к выполнению и конфигурация движка совпадает, процесс будет повторно использовать ту же сессию.
  • Количество моделей на Python, выполняющихся одновременно, зависит от threads. Количество сессий, созданных для всего запуска, зависит от количества уникальных конфигураций движка и доступности сессий для поддержания параллельности потоков.
  • Для таблиц Iceberg рекомендуется использовать конфигурацию table_properties, чтобы установить format_version в 2. Это помогает поддерживать совместимость между таблицами Iceberg, созданными Trino, и теми, что созданы Spark.

Примеры моделей

import pandas as pd


def model(dbt, session):
dbt.config(materialized="table")

model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

return model_df

Известные проблемы в моделях на Python

  • Модели на Python не могут ссылаться на представления SQL Athena.
  • Вы можете использовать сторонние библиотеки Python; однако они должны быть включены в предустановленный список или импортированы вручную.
  • Модели на Python могут ссылаться или записывать только в таблицы с именами, соответствующими регулярному выражению: ^[0-9a-zA-Z_]+$. Spark не поддерживает дефисы или специальные символы, хотя Athena их поддерживает.
  • Инкрементальные модели не полностью используют возможности Spark. Они частично зависят от существующей логики на основе SQL, которая выполняется на Trino.
  • Материализации снимков не поддерживаются.
  • Spark может ссылаться только на таблицы в том же каталоге.
  • Для таблиц, созданных вне инструмента dbt, убедитесь, что поле местоположения заполнено, иначе dbt выдаст ошибку при создании таблицы.

Контракты

Адаптер частично поддерживает определения контрактов:

  • data_type поддерживается, но нуждается в корректировке для сложных типов. Типы должны быть указаны полностью (например, array<int>), хотя они не будут проверяться. Действительно, как рекомендует dbt, мы сравниваем только более широкий тип (array, map, int, varchar). Полное определение используется для проверки того, что типы данных, определенные в Athena, в порядке (предварительная проверка).
  • Адаптер не поддерживает ограничения, так как в Athena нет концепции ограничений.
0