Я обнаружил, что моя работа по склеиванию AWS добавляет дубликаты данных в мой каталог данных.У меня есть задание, которое читает JSON, дедуплирует его с помощью Spark SQL, а затем пытается сохранить его в каталоге данных.Но я, должно быть, делаю это неправильно, потому что каталог данных дублируется при каждом запуске задачи
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
inputDF = inputGDF.toDF()
print(inputDF.first())
inputDF.createOrReplaceTempView("p_places")
# Processing the data to dedup it based on ID
filteredDF = spark.sql("""
SELECT id, parentid, code, type, name, createdat, updatedat
FROM (
SELECT
ROW_NUMBER() OVER (PARTITION BY ID ORDER BY updatedat DESC) ROW_NUM,
id, parentid, code, type, name, createdat, updatedat
FROM p_places
)
WHERE ROW_NUM = 1
""")
filteredGDF = DynamicFrame.fromDF(filteredDF, glueContext, "filteredGDF")
filteredDF.createOrReplaceTempView('p_places_2')
verification = spark.sql("""
SELECT COUNT(id) FROM p_places_2 WHERE id = '12542'
""")
print("VERIFICATION:")
print(verification.first()) # Correctly output 1 (no dups)
outputGDF = glueContext.write_dynamic_frame.from_options(frame = filteredGDF, connection_type = "s3", connection_options = {"path": "s3://..."}, format = "parquet", transformation_ctx = "outputGDF")
job.commit()
Но когда я использую Athena для запроса данных, при каждом запуске появляется 1 дополнительная дублирующаяся строка.Это почему?Я подозреваю, что запись в файл паркета будет всегда добавлять?Как я могу решить это?