Как выполнить задание Glue ETL (преобразование в паркет из необработанной зоны в обработанную) для всех таблиц в необработанной зоне? - PullRequest
0 голосов
/ 08 мая 2019

В настоящее время я нахожусь в процессе автоматизации процесса приема данных из озера данных.У меня есть данные, поступающие в мою зону Raw (сегмент S3).В корзине у меня есть 27 папок, каждая из которых соответствует базе данных - каждая папка имеет х количество файлов CSV, каждая из которых соответствует таблице.У меня есть событие S3 (события создания всех объектов), запускающее лямбда-функцию, которая сканирует мою необработанную зону.Я могу видеть каждую таблицу успешно.После завершения я хотел бы создать задание ETL, которое перемещает данные в обработанной зоне, преобразуя их в паркет, однако, учитывая количество имеющихся у меня таблиц, я не хочу вручную создавать задание, определяющее каждую таблицу как «источник».».

Я продемонстрировал свои сервисы автоматизации, загрузив один файл CSV в мою необработанную зону, и сканер запустился, а затем запустилось задание ETL, преобразовав «таблицу сырых зон s3» в паркет и посадив его в обработанную зону.,Когда я отбросил вторую таблицу, сканер смог успешно распознать ее как новую таблицу в моей необработанной зоне, но в обработанной зоне он объединяет данные с первой схемой (даже если они совершенно разные).

Я ожидал бы следующее: 1) сканер распознает csv как таблицу 2) клей etl для преобразования файла в паркет 3) сканер распознает файл (ы) паркета как одну таблицу

Следующий код подчеркивает проблему, с которой я столкнулся - указанным источником данных является таблица (папка), и предполагается, что все в этой папке имеет одинаковую схему.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

1 Ответ

0 голосов
/ 09 мая 2019

Создано задание ETL со следующей функцией для циклического перемещения по таблицам в моей базе данных и записи файла паркета в новую папку с тем же именем (чтобы я мог сканировать таблицу и использовать athena для запроса).

databaseName = 'DATABASE'
Tables = client.get_tables( DatabaseName = databaseName )
tableList = Tables ['TableList']
for table in tableList:
    tableName = table['Name']
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "DATABASE", table_name = tableName, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://processed-45ah4xoyqr1b/Application1/"+tableName+"/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
...