В настоящее время я нахожусь в процессе автоматизации процесса приема данных из озера данных.У меня есть данные, поступающие в мою зону Raw (сегмент S3).В корзине у меня есть 27 папок, каждая из которых соответствует базе данных - каждая папка имеет х количество файлов CSV, каждая из которых соответствует таблице.У меня есть событие S3 (события создания всех объектов), запускающее лямбда-функцию, которая сканирует мою необработанную зону.Я могу видеть каждую таблицу успешно.После завершения я хотел бы создать задание ETL, которое перемещает данные в обработанной зоне, преобразуя их в паркет, однако, учитывая количество имеющихся у меня таблиц, я не хочу вручную создавать задание, определяющее каждую таблицу как «источник».».
Я продемонстрировал свои сервисы автоматизации, загрузив один файл CSV в мою необработанную зону, и сканер запустился, а затем запустилось задание ETL, преобразовав «таблицу сырых зон s3» в паркет и посадив его в обработанную зону.,Когда я отбросил вторую таблицу, сканер смог успешно распознать ее как новую таблицу в моей необработанной зоне, но в обработанной зоне он объединяет данные с первой схемой (даже если они совершенно разные).
Я ожидал бы следующее: 1) сканер распознает csv как таблицу 2) клей etl для преобразования файла в паркет 3) сканер распознает файл (ы) паркета как одну таблицу
Следующий код подчеркивает проблему, с которой я столкнулся - указанным источником данных является таблица (папка), и предполагается, что все в этой папке имеет одинаковую схему.
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]