Получение дубликатов в таблице, когда задание ETL разрушено дважды. Задание извлечения задания ETL из корзины RDS в S3 - PullRequest
0 голосов
/ 30 января 2019

Когда задание ETL запущено, оно выполняется должным образом, но, поскольку в таблице нет метки времени, оно дублирует данные, когда выполняется то же задание ETL. Как выполнить постановку и решить эту проблему с помощью Upsert или любого другого, к которому вы можете обратиться?ответ. Как мне избавиться от этой проблемы, решение, которое я нахожу, заключается в том, чтобы включить в нее метку времени или сделать постановку, или есть какой-то другой способ?

Ответы [ 2 ]

0 голосов
/ 31 января 2019

Чтобы предотвратить дублирование на s3, вам необходимо загрузить данные из места назначения и отфильтровать существующие записи перед сохранением:

val deltaDf = newDataDf.alias("new")
  .join(existingDf.alias("existing"), "id", "left_outer")
  .where(col("existing.id").isNull)
  .select("new.*")

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))

Однако этот метод не перезаписывает обновленные записи.

Другим вариантом является сохранение обновленных записей также с некоторым полем updated_at, которое может использоваться нижестоящими потребителями для получения последних значений.

Вы также можете рассмотреть возможность выгрузки набора данных в отдельную папку каждыйвремя выполнения задания (т. е. каждый день у вас полный дамп данных в data/dataset_date=<year-month-day>)

import org.apache.spark.sql.functions._

val datedDf = sourceDf.withColumn("dataset_date", current_date())

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path,
      "partitionKeys" -> Array("dataset_date")
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(datedDf, glueContext))
0 голосов
/ 30 января 2019

U может использовать overwrite при записи данных в s3.Он заменит исходные данные

...