Чтобы предотвратить дублирование на s3, вам необходимо загрузить данные из места назначения и отфильтровать существующие записи перед сохранением:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
Однако этот метод не перезаписывает обновленные записи.
Другим вариантом является сохранение обновленных записей также с некоторым полем updated_at
, которое может использоваться нижестоящими потребителями для получения последних значений.
Вы также можете рассмотреть возможность выгрузки набора данных в отдельную папку каждыйвремя выполнения задания (т. е. каждый день у вас полный дамп данных в data/dataset_date=<year-month-day>
)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))