AWS склеить и обновить дублирующие данные - PullRequest
0 голосов
/ 22 ноября 2018

Я использую AWS Glue для перемещения нескольких файлов в экземпляр RDS из S3.Каждый день я получаю новый файл в S3, который может содержать новые данные, но также может содержать запись, которую я уже сохранил, с некоторыми значениями обновлений.Если я запускаю работу несколько раз, я, конечно, получу дубликаты записей в базе данных.Вместо того, чтобы вставлять несколько записей, я хочу, чтобы Glue попытался обновить эту запись, если он заметит, что поле изменилось, каждая запись имеет уникальный идентификатор.Возможно ли это?

Ответы [ 3 ]

0 голосов
/ 25 ноября 2018

Я использовал INSERT в таблицу .... ON DUPLICATE KEY ... для UPSERT в Aurora RDS с запущенным двигателем mysql.Может быть, это будет ссылка для вашего варианта использования.Мы не можем использовать JDBC, поскольку в настоящее время поддерживаются только режимы APPEND, OVERWRITE, ERROR.

Я не уверен в используемом вами механизме базы данных RDS, и ниже приведен пример для mysql UPSERTS.

Пожалуйста, смотрите эту ссылку, где я опубликовал решение, используя INSERT INTO TABLE..ON DUPLICATE KEY для mysql:

Ошибка при использовании INSERT INTO таблица ON DUPLICATE KEY с использованием массива цикла loop

0 голосов
/ 01 мая 2019

Я следовал аналогичному подходу, который Юрий предложил как второй вариант.Получить существующие данные, а также новые данные, а затем выполнить некоторую обработку, чтобы объединить их и записать в режиме перезаписи.Следующий код поможет вам понять, как решить эту проблему.

sc = SparkContext()
glueContext = GlueContext(sc)

#get your source data 
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df =  src_data.toDF()


#get your destination data 
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df =  dst_data.toDF()

#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)

#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options(   url = dest_jdbc_url, 
                                          user = dest_user_name,
                                          password = dest_password,
                                          dbtable = dest_tbl ).mode("overwrite").save()
0 голосов
/ 23 ноября 2018

К сожалению, нет элегантного способа сделать это с помощью клея.Если бы вы писали в Redshift, вы могли бы использовать postactions для реализации операции объединения Redshift .Однако это невозможно для других приемников jdbc (afaik).

В качестве альтернативы в вашем сценарии ETL вы можете загрузить существующие данные из базы данных, чтобы отфильтровать существующие записи перед сохранением.Однако, если ваша таблица БД велика, задание может занять некоторое время для ее обработки.

Другой подход - сначала записать в промежуточную таблицу режим «перезапись» (заменить существующие промежуточные данные), а затем сделать вызов.в БД через API для копирования новых записей только в итоговую таблицу.

...