Настройка AWS Glue
Некоторая базовая функциональность может быть ограничена. Если вы заинтересованы в том, чтобы внести вклад, ознакомьтесь с исходным кодом каждого репозитория, перечисленного ниже.
- Поддерживается: Community
- Авторы: Benjamin Menuet, Moshir Mikael, Armando Segnini and Amine El Mallem
- Репозиторий GitHub: aws-samples/dbt-glue
- Пакет PyPI:
dbt-glue - Канал в Slack: #db-glue
- Поддерживаемая версия dbt Core: v0.24.0 и новее
- Поддержка dbt: Not Supported
- Минимальная версия платформы данных: Glue 2.0
Установка dbt-glue
Установите адаптер с помощью pip. До версии 1.8 установка адаптера автоматически устанавливала dbt-core и любые дополнительные зависимости. Начиная с 1.8 установка адаптера не устанавливает dbt-core автоматически. Это потому, что версии адаптеров и dbt Core были развязаны, и мы больше не хотим перезаписывать существующие установки dbt-core.
Используйте следующую команду для установки:
python -m pip install dbt-core dbt-glue
Настройка dbt-glue
Конфигурацию, специфичную для AWS Glue, см. на странице настроек AWS Glue.
Для получения дополнительной (и, скорее всего, более актуальной) информации см. README
Способы подключения
Настройка AWS‑профиля для Glue Interactive Session
Для интерактивных сессий используются два IAM‑принципала.
- Клиентский принципал: принципал (пользователь или роль), который вызывает AWS API (Glue, Lake Formation, Interactive Sessions) с локального клиента. Этот принципал настраивается в AWS CLI и, как правило, совпадает с основным.
- Сервисная роль: IAM‑роль, которую AWS Glue использует для выполнения вашей сессии. Это та же роль, что используется в AWS Glue ETL.
Прочитайте эту документацию, чтобы настроить эти принципалы.
Ниже приведена политика с минимально необходимыми правами, позволяющая использовать все возможности адаптера dbt-glue.
Пожалуйста, обновите значения переменных между <>. Ниже приведены пояснения к этим аргументам:
| Loading table... |
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Read_and_write_databases",
"Action": [
"glue:SearchTables",
"glue:BatchCreatePartition",
"glue:CreatePartitionIndex",
"glue:DeleteDatabase",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:DeleteTableVersion",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:DeletePartitionIndex",
"glue:GetTableVersion",
"glue:UpdateColumnStatisticsForTable",
"glue:CreatePartition",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"glue:UpdateColumnStatisticsForPartition",
"glue:CreateDatabase",
"glue:BatchDeleteTableVersion",
"glue:BatchDeleteTable",
"glue:DeletePartition",
"glue:GetUserDefinedFunctions",
"lakeformation:ListResources",
"lakeformation:BatchGrantPermissions",
"lakeformation:ListPermissions",
"lakeformation:GetDataAccess",
"lakeformation:GrantPermissions",
"lakeformation:RevokePermissions",
"lakeformation:BatchRevokePermissions",
"lakeformation:AddLFTagsToResource",
"lakeformation:RemoveLFTagsFromResource",
"lakeformation:GetResourceLFTags",
"lakeformation:ListLFTags",
"lakeformation:GetLFTag",
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:table/<dbt output database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt output database>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_databases",
"Action": [
"glue:SearchTables",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:GetTableVersion",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"lakeformation:ListResources",
"lakeformation:ListPermissions"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:table/<dbt source database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt source database>",
"arn:aws:glue:<region>:<AWS Account>:database/default",
"arn:aws:glue:<region>:<AWS Account>:database/global_temp"
],
"Effect": "Allow"
},
{
"Sid": "Storage_all_buckets",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>",
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_and_write_buckets",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_buckets",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
}
]
}
Настройка локального окружения
Поскольку адаптеры dbt и dbt-glue совместимы с Python версии 3.9 или выше, проверьте версию Python:
$ python3 --version
Настройте виртуальное окружение Python, чтобы изолировать версии пакетов и зависимости кода:
$ sudo yum install git
$ python3 -m venv dbt_venv
$ source dbt_venv/bin/activate
$ python3 -m pip install --upgrade pip
Настройте последнюю версию AWS CLI:
$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install
Установите пакет boto3:
$ sudo yum install gcc krb5-devel.x86_64 python3-devel.x86_64 -y
$ pip3 install —upgrade boto3
Установите пакет:
$ pip3 install dbt-glue
Пример конфигурации
type: glue
query-comment: This is a glue dbt example
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: us-east-1
workers: 2
worker_type: G.1X
idle_timeout: 10
schema: "dbt_demo"
session_provisioning_timeout_in_seconds: 120
location: "s3://dbt_demo_bucket/dbt_demo_data"
В таблице ниже описаны все доступные опции.
| Loading table... |
Конфигурации
Настройка таблиц
При материализации модели как table вы можете указать несколько дополнительных конфигураций, специфичных для плагина dbt‑spark, в дополнение к стандартным настройкам моделей.
| Loading table... |
Инкрементальные модели
dbt стремится предоставлять удобные и интуитивно понятные абстракции моделирования с помощью встроенных конфигураций и материализаций.
Поэтому плагин dbt‑glue активно использует конфигурацию incremental_strategy. Эта настройка определяет, как именно будет выполняться инкрементальная материализация при запусках после первого. Возможны три значения:
append(по умолчанию): вставляет новые записи, не обновляя и не перезаписывая существующие данные.insert_overwrite: если указанpartition_by, перезаписывает разделы таблицы новыми данными. Еслиpartition_byне указан, перезаписывает всю таблицу.merge(только Apache Hudi и Apache Iceberg): сопоставляет записи поunique_key, обновляет старые записи и вставляет новые. (Еслиunique_keyне указан, все новые данные просто вставляются, аналогичноappend.)
Каждая из этих стратегий имеет свои плюсы и минусы, которые рассмотрены ниже. Как и любую конфигурацию модели, incremental_strategy можно задать в dbt_project.yml или в блоке config() внутри файла модели.
Примечания:
Стратегия по умолчанию — insert_overwrite
Стратегия append
При использовании стратегии append dbt выполняет оператор insert into со всеми новыми данными. Преимущество этой стратегии в её простоте и универсальности — она работает на всех платформах, для всех типов файлов, способов подключения и версий Apache Spark. Однако эта стратегия не может обновлять, перезаписывать или удалять существующие данные, поэтому для многих источников данных возможна вставка дубликатов.
Исходный код
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}
-- All rows returned by this query will be appended to the existing table
select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
Код выполнения
create temporary view spark_incremental__dbt_tmp as
select * from analytics.events
where event_ts >= (select max(event_ts) from {{ this }})
;
insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
Стратегия insert_overwrite
Эта стратегия наиболее эффективна при использовании вместе с параметром partition_by в конфигурации модели. dbt выполнит атомарный оператор insert overwrite, который динамически заменяет все разделы, затронутые запросом. При использовании этой стратегии обязательно выбирайте все релевантные данные для каждого раздела.
Если partition_by не указан, стратегия insert_overwrite атомарно заменит всё содержимое таблицы, перезаписав существующие данные только новыми записями. Схема колонок при этом сохраняется. В некоторых случаях это может быть полезно, так как минимизирует простой при перезаписи данных. По смыслу операция аналогична truncate + insert в других базах данных. Для атомарной замены таблиц в формате Delta используйте материализацию table (которая выполняет create or replace).
Исходный код
{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}
/*
Every partition returned by this query will be overwritten
when this model runs
*/
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
date_day,
count(*) as users
from events
group by 1
Код выполнения
create temporary view spark_incremental__dbt_tmp as
with new_events as (
select * from analytics.events
where date_day >= date_add(current_date, -1)
)
select
date_day,
count(*) as users
from events
group by 1
;
insert overwrite table analytics.spark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp
Указывать insert_overwrite в качестве стратегии необязательно, так как она используется по умолчанию, если стратегия не задана.
Стратегия merge
Совместимость:
- Hudi : OK
- Delta Lake : OK
- Iceberg : OK
- Таблицы под управлением Lake Formation : в процессе
NB:
-
Для Glue 3 необходимо настроить Glue connectors.
-
Для Glue 4 используйте параметр
datalake_formatsв profile.yml.
При использовании коннектора убедитесь, что ваша IAM‑роль содержит следующие политики:
{
"Sid": "access_to_connections",
"Action": [
"glue:GetConnection",
"glue:GetConnections"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:connection/*"
],
"Effect": "Allow"
}
и что к роли прикреплена управляемая политика AmazonEC2ContainerRegistryReadOnly.
Также убедитесь, что вы выполнили инструкции по начальной настройке, описанные здесь.
В этом посте блога также объясняется, как настроить и использовать Glue Connectors.
Hudi
Примечания по использованию: стратегия инкремента merge с Hudi требует:
- Добавить
file_format: hudiв конфигурацию таблицы - Добавить
datalake_formatsв профиль:datalake_formats: hudi- Либо добавить подключение в профиль:
connections: name_of_your_hudi_connector
- Либо добавить подключение в профиль:
- Добавить Kryo serializer в Interactive Session Config (в профиле):
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
dbt выполнит атомарный merge‑statement, который выглядит почти идентично поведению merge по умолчанию в Snowflake и BigQuery. Если указан unique_key (рекомендуется), dbt обновит старые записи значениями из новых записей, которые совпадают по ключевой колонке. Если unique_key не указан, dbt не будет использовать критерии сопоставления и просто вставит все новые записи (аналогично стратегии append).
Пример конфигурации профиля
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
datalake_formats: hudi
Пример исходного кода
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
file_format='hudi',
hudi_options={
'hoodie.datasource.write.precombine.field': 'eventtime',
}
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
Delta
Вы также можете использовать Delta Lake, чтобы иметь возможность применять merge к таблицам.
Примечания по использованию: стратегия инкремента merge с Delta требует:
- Добавить
file_format: deltaв конфигурацию таблицы - Добавить
datalake_formatsв профиль:datalake_formats: delta- Либо добавить подключение в профиль:
connections: name_of_your_delta_connector
- Либо добавить подключение в профиль:
- Добавить следующую конфигурацию в Interactive Session Config (в профиле):
conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Athena: Athena по умолчанию не совместима с delta‑таблицами, но вы можете настроить адаптер так, чтобы он создавал таблицы Athena поверх вашей delta‑таблицы. Для этого нужно настроить в профиле следующие опции:
- Для Delta Lake 2.1.0, нативно поддерживаемого в Glue 4.0:
extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-2.1.0.jar" - Для Delta Lake 1.0.0, нативно поддерживаемого в Glue 3.0:
extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-1.0.0.jar" delta_athena_prefix: "the_prefix_of_your_choice"- Если ваша таблица партиционирована, добавление новых партиций не происходит автоматически — после добавления каждой новой партиции нужно выполнять
MSCK REPAIR TABLE your_delta_table
Пример конфигурации профиля
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: delta
conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-2.1.0.jar"
delta_athena_prefix: "delta"
Пример исходного кода
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
partition_by=['dt'],
file_format='delta'
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen,
current_date() as dt
from events
group by 1
Iceberg
Примечания по использованию: стратегия инкремента merge с Iceberg требует:
- Прикрепить managed policy AmazonEC2ContainerRegistryReadOnly к вашей execution role:
- Добавить следующую политику к вашей execution role, чтобы включить commit locking в таблице DynamoDB (подробнее здесь). Обратите внимание: таблица DynamoDB, указанная в поле resource этой политики, должна совпадать с той, что указана в ваших dbt profiles (
--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable). По умолчанию эта таблица называетсяmyGlueLockTableи создаётся автоматически (с On-Demand Pricing) при запуске модели dbt-glue с инкрементальной материализацией и форматом файла Iceberg. Если вы хотите назвать таблицу иначе или создать свою таблицу, не позволяя Glue делать это за вас, укажите параметрiceberg_glue_commit_lock_tableс именем вашей таблицы (например,MyDynamoDbTable) в профиле dbt.
iceberg_glue_commit_lock_table: "MyDynamoDbTable"
- Последний коннектор для Iceberg в AWS Marketplace использует версию 0.14.0 для Glue 3.0 и версию 1.2.1 для Glue 4.0. В Glue 4.0 Kryo serialization падает при записи Iceberg, поэтому вместо этого используйте
"org.apache.spark.serializer.JavaSerializer"дляspark.serializer. Подробнее здесь.
Убедитесь, что вы обновили conf, добавив --conf spark.sql.catalog.glue_catalog.lock.table=<YourDynamoDBLockTableName>, и что вы заменили IAM‑права ниже на корректное имя вашей таблицы.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CommitLockTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:ConditionCheckItem",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:<AWS_REGION>:<AWS_ACCOUNT_ID>:table/myGlueLockTable"
}
]
}
- Добавить
file_format: Icebergв конфигурацию таблицы - Добавить
datalake_formatsв профиль:datalake_formats: iceberg- Либо добавить connections в профиль:
connections: name_of_your_iceberg_connector(- Для Athena версии 3:
- Адаптер совместим с Iceberg Connector из AWS Marketplace с Fulfillment option Glue 3.0 и версией ПО 0.14.0 (11 октября 2022)
- Последний коннектор для Iceberg в AWS Marketplace использует версию 0.14.0 для Glue 3.0 и версию 1.2.1 для Glue 4.0. В Glue 4.0 Kryo serialization падает при записи Iceberg, поэтому вместо этого используйте "org.apache.spark.serializer.JavaSerializer" для spark.serializer. Подробнее здесь
- Для Athena версии 2: адаптер совместим с Iceberg Connector из AWS Marketplace с Fulfillment option Glue 3.0 и версией ПО 0.12.0-2 (14 февраля 2022)
- Для Athena версии 3:
- Либо добавить connections в профиль:
- Добавить следующую конфигурацию в Interactive Session Config (в профиле):
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.sql.warehouse=s3://<your-bucket-name>
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
- Для Glue 3.0 вместо этого установите
spark.sql.catalog.glue_catalog.lock-implвorg.apache.iceberg.aws.glue.DynamoLockManager
dbt выполнит атомарный merge‑statement, который выглядит почти идентично поведению merge по умолчанию в Snowflake и BigQuery. Чтобы выполнить merge, нужно указать unique_key, иначе операция завершится ошибкой. Этот ключ нужно указывать в формате Python‑списка; он может содержать несколько имён колонок, чтобы создать составной (composite) unique_key.
Примечания
- При использовании custom_location в Iceberg избегайте завершающего слэша. Добавление завершающего слэша приводит к некорректной обработке location и проблемам при чтении данных движками запросов, например Trino. Проблема должна быть исправлена для Iceberg версии > 0.13. Связанный issue на GitHub — здесь.
- Iceberg также поддерживает стратегии
insert_overwriteиappend. - Параметр
warehouseвconfобязателен, но переопределяется значениемlocationв профиле адаптера илиcustom_locationв конфигурации модели. - По умолчанию у этой материализации
iceberg_expire_snapshotsустановлен в 'True'. Если вам нужно сохранять исторические изменения для аудита, задайте:iceberg_expire_snapshots='False'. - Сейчас из‑за некоторых внутренних особенностей dbt iceberg‑каталог, который используется внутри при запуске glue interactive sessions с dbt-glue, имеет захардкоженное имя
glue_catalog. Это имя — алиас, указывающий на AWS Glue Catalog, но он специфичен для каждой сессии. Если вы хотите работать с данными в другой сессии без dbt-glue (например, из Glue Studio notebook), вы можете настроить другой алиас (то есть другое имя для Iceberg Catalog). Чтобы проиллюстрировать это, в конфигурационном файле можно задать:
--conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog
Затем запустите сессию в AWS Glue Studio Notebook со следующей конфигурацией:
--conf spark.sql.catalog.AnotherRandomCatalogName=org.apache.iceberg.spark.SparkCatalog
В обоих случаях базовым каталогом будет AWS Glue Catalog, уникальный для вашего AWS Account и Region, и вы сможете работать с ровно теми же данными. Также убедитесь, что если вы меняете имя алиаса Glue Catalog, вы меняете его во всех остальных --conf, где он используется:
--conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.RandomCatalogName.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
...
--conf spark.sql.catalog.RandomCatalogName.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager
- Полная справка по
table_propertiesдоступна здесь. - Таблицы Iceberg нативно поддерживаются Athena. Поэтому вы можете выполнять запросы к таблицам, созданным и обслуживаемым адаптером dbt-glue, из Athena.
- Инкрементальная материализация с форматом файла Iceberg поддерживает dbt snapshot. Вы можете запустить команду dbt snapshot, которая делает запрос к таблице Iceberg, и создать для неё snapshot в стиле dbt.
Пример конфигурации профиля
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: iceberg
conf: --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.warehouse=s3://aws-dbt-glue-datalake-1234567890-eu-west-1/dbt_test_project --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Пример исходного кода
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['user_id'],
file_format='iceberg',
iceberg_expire_snapshots='False',
partition_by=['status']
table_properties={'write.target-file-size-bytes': '268435456'}
) }}
with new_events as (
select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}
)
select
user_id,
max(date_day) as last_seen
from events
group by 1
Пример исходного кода Iceberg Snapshot
Мониторинг Glue Interactive Session
Мониторинг — важная часть поддержания надёжности, доступности и производительности AWS Glue и ваших других AWS‑решений. AWS предоставляет инструменты мониторинга, которые можно использовать, чтобы наблюдать за AWS Glue, определить нужное количество workers для вашей Glue Interactive Session, сообщать, когда что-то идёт не так, и при необходимости автоматически предпринимать действия. AWS Glue предоставляет Spark UI, а также логи и метрики CloudWatch для мониторинга ваших AWS Glue jobs. Подробнее: Monitoring AWS Glue Spark jobs
Примечания по использованию: для мониторинга требуется:
- Добавить следующую IAM‑политику к вашей IAM role:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CloudwatchMetrics",
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {
"StringEquals": {
"cloudwatch:namespace": "Glue"
}
}
},
{
"Sid": "CloudwatchLogs",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:/aws-glue/*",
"arn:aws:s3:::bucket-to-write-sparkui-logs/*"
]
}
]
}
- Добавить параметры мониторинга в Interactive Session Config (в профиле). Подробнее см. Job parameters used by AWS Glue
Пример конфигурации профиля
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-metrics=true, --enable-continuous-cloudwatch-log=true, --enable-continuous-log-filter=true, --enable-spark-ui=true, --spark-event-logs-path=s3://bucket-to-write-sparkui-logs/dbt/"
Если вы хотите использовать Spark UI, вы можете запустить Spark history server с помощью шаблона AWS CloudFormation, который размещает сервер на EC2‑инстансе, или запустить локально с помощью Docker. Подробнее см. Launching the Spark history server
Включение AWS Glue Auto Scaling
Auto Scaling доступен начиная с AWS Glue версии 3.0 и выше. Подробнее см. в посте AWS: "Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark"
При включённом Auto Scaling вы получаете следующие преимущества:
-
AWS Glue автоматически добавляет и удаляет workers из кластера в зависимости от параллелизма на каждом этапе или microbatch выполнения job.
-
Не нужно экспериментировать и решать, сколько workers назначать для ваших AWS Glue Interactive sessions.
-
После того как вы выберете максимальное число workers, AWS Glue подберёт ресурсы нужного размера для нагрузки.
-
Вы можете увидеть, как меняется размер кластера во время выполнения Glue Interactive sessions, посмотрев метрики CloudWatch. Подробнее см. Monitoring your Glue Interactive Session.
Примечания по использованию: для AWS Glue Auto Scaling требуется:
- Установить AWS Glue версии 3.0 или выше.
- Задать максимальное число workers (если Auto Scaling включён, параметр
workersзадаёт максимальное число workers) - Указать параметр
--enable-auto-scaling=trueв Glue Interactive Session Config (в профиле). Подробнее см. Job parameters used by AWS Glue
Пример конфигурации профиля
test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "3.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-auto-scaling=true"
Доступ к Glue catalog в другом AWS account
Во многих случаях вам может понадобиться запускать dbt jobs, чтобы читать данные из другого AWS account.
Ознакомьтесь со ссылкой https://repost.aws/knowledge-center/glue-tables-cross-accounts, чтобы настроить политики доступа в source и target accounts.
Добавьте "spark.hadoop.hive.metastore.glue.catalogid=<AWS-ACCOUNT-ID>" в conf в dbt profile — так вы сможете иметь несколько outputs для каждого account, к которому у вас есть доступ.
Примечание: кросс‑аккаунтный доступ должен быть в пределах одного и того же AWS Region.
Пример конфигурации профиля
test_project:
target: dev
outputsAccountB:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "3.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: "--conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
--conf spark.hadoop.hive.metastore.glue.catalogid=<TARGET-AWS-ACCOUNT-ID-B>"
Сохранение описаний моделей
Поддерживается сохранение документации на уровне relations. Для получения дополнительной информации о настройке сохранения документации см. документацию.
Когда опция persist_docs настроена соответствующим образом, вы сможете
видеть описания моделей в поле Comment вывода команды describe [table] extended
или show table extended in [database] like '*'.
Всегда schema, никогда database
Apache Spark использует термины «schema» и «database» как взаимозаменяемые. dbt же
понимает database как уровень, находящийся выше, чем schema. Поэтому при работе с dbt-glue
вам никогда не следует использовать или задавать database ни в конфигурации узлов, ни в целевом профиле.
Если вы хотите управлять схемой/базой данных, в которой dbt будет материализовывать модели,
используйте только конфигурацию schema и макрос generate_schema_name.
Подробнее см. документацию dbt о пользовательских схемах.
Интеграция с AWS Lake Formation
Адаптер поддерживает управление тегами AWS Lake Formation, позволяя ассоциировать существующие теги, определённые вне dbt-glue, с объектами базы данных, создаваемыми dbt-glue (database, table, view, snapshot, incremental models, seeds).
- Вы можете включать или отключать управление lf‑тегами через конфигурацию на уровне модели и dbt‑project (по умолчанию отключено)
- Если включено, lf‑теги будут обновляться при каждом запуске dbt. Поддерживаются конфигурации lf‑тегов на уровне таблиц и на уровне колонок.
- Вы можете указать, что хотите удалить существующие теги Lake Formation на уровне базы данных, таблицы или колонок, установив поле конфигурации
drop_existingв значение True (по умолчанию False, что означает сохранение существующих тегов) - Обратите внимание: если тег, который вы хотите связать с таблицей, не существует, выполнение dbt-glue завершится ошибкой
Адаптер также поддерживает фильтрацию ячеек данных (data cell filtering) AWS Lake Formation.
- Вы можете включать или отключать фильтрацию ячеек данных через конфигурацию на уровне модели и dbt‑project (по умолчанию отключено)
- Если включено,
data_cell_filtersбудут обновляться при каждом запуске dbt - Вы можете указать, что хотите удалить существующие фильтры ячеек данных таблицы, установив поле
drop_existingв значение True (по умолчанию False, что означает сохранение существующих фильтров) - Для реализации безопасности на уровне колонок вы можете использовать OR поле
excluded_columns_namesOR полеcolumns. Обратите внимание, что можно использовать только одно из них, но не оба одновременно. - По умолчанию, если вы не указываете ни
columns, ниexcluded_columns, dbt-glue не выполняет фильтрацию на уровне колонок и позволяет принципалу доступ ко всем колонкам.
Приведённая ниже конфигурация позволяет указанному принципалу (IAM‑пользователь lf-data-scientist) получать доступ к строкам, у которых customer_lifetime_value > 15, и ко всем указанным колонкам (customer_id, first_order, most_recent_order, number_of_orders):
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'column_names': ['customer_id', 'first_order', 'most_recent_order', 'number_of_orders']
}
},
}
}
Следующая конфигурация позволяет указанному принципалу (IAM‑пользователь lf-data-scientist) получать доступ к строкам, у которых customer_lifetime_value > 15, и ко всем колонкам, кроме указанной (first_name):
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}
Ниже приведены примеры того, как можно интегрировать управление LF‑тегами и фильтрацию ячеек данных в ваши конфигурации:
На уровне модели
Такой способ задания правил Lake Formation подходит, если вы хотите управлять политиками тегирования и фильтрации на уровне отдельных объектов. Помните, что он переопределяет любую конфигурацию, заданную на уровне dbt‑project.
{{ config(
materialized='incremental',
unique_key="customer_id",
incremental_strategy='append',
lf_tags_config={
'enabled': true,
'drop_existing' : False,
'tags_database':
{
'name_of_my_db_tag': 'value_of_my_db_tag'
},
'tags_table':
{
'name_of_my_table_tag': 'value_of_my_table_tag'
},
'tags_columns': {
'name_of_my_lf_tag': {
'value_of_my_tag': ['customer_id', 'customer_lifetime_value', 'dt']
}}},
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}
) }}
select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value,
current_date() as dt
from customers
left join customer_orders using (customer_id)
left join customer_payments using (customer_id)
На уровне dbt-project
Таким образом вы можете задать теги и политику фильтрации данных для определённого пути в вашем dbt‑проекте (например, models, seeds, models/model_group1 и т.д.). Это особенно полезно для seeds, для которых нельзя задать конфигурацию непосредственно в файле.
seeds:
+lf_tags_config:
enabled: true
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
models:
+lf_tags_config:
enabled: true
drop_existing: True
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'
Тесты
Для выполнения функционального тестирования:
- Установите зависимости для разработки:
$ pip3 install -r dev-requirements.txt
- Установите пакет локально в dev‑режиме
$ python3 setup.py build && python3 setup.py install_lib
- Экспортируйте переменные окружения
$ export DBT_S3_LOCATION=s3://mybucket/myprefix
$ export DBT_ROLE_ARN=arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
- Запустите тесты
$ python3 -m pytest tests/functional
Для получения дополнительной информации см. документацию dbt о тестировании нового адаптера.
Ограничения
Поддерживаемая функциональность
Большая часть функциональности dbt Core поддерживается, однако некоторые возможности доступны только при использовании Apache Hudi.
Функции, доступные только с Apache Hudi:
- Инкрементальные обновления моделей по
unique_keyвместоpartition_by(см. стратегиюmerge)
Некоторые возможности dbt, доступные в core‑адаптерах, пока не поддерживаются в Glue:
- Сохранение описаний колонок в виде комментариев базы данных
- Snapshots