AWS Glue добавляет дубликаты записей в каталог данных - PullRequest
0 голосов
/ 01 декабря 2018

Я обнаружил, что моя работа по склеиванию AWS добавляет дубликаты данных в мой каталог данных.У меня есть задание, которое читает JSON, дедуплирует его с помощью Spark SQL, а затем пытается сохранить его в каталоге данных.Но я, должно быть, делаю это неправильно, потому что каталог данных дублируется при каждом запуске задачи

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")

inputDF = inputGDF.toDF()

print(inputDF.first())

inputDF.createOrReplaceTempView("p_places")

# Processing the data to dedup it based on ID
filteredDF = spark.sql("""
  SELECT id, parentid, code, type, name, createdat, updatedat 
  FROM (
    SELECT 
      ROW_NUMBER() OVER (PARTITION BY ID ORDER BY updatedat DESC) ROW_NUM,
      id, parentid, code, type, name, createdat, updatedat
    FROM p_places
  )
  WHERE ROW_NUM = 1
""")

filteredGDF = DynamicFrame.fromDF(filteredDF, glueContext, "filteredGDF")

filteredDF.createOrReplaceTempView('p_places_2')
verification = spark.sql("""
    SELECT COUNT(id) FROM p_places_2 WHERE id = '12542'
""")
print("VERIFICATION:")
print(verification.first()) # Correctly output 1 (no dups)

outputGDF = glueContext.write_dynamic_frame.from_options(frame = filteredGDF, connection_type = "s3", connection_options = {"path": "s3://..."}, format = "parquet", transformation_ctx = "outputGDF")

job.commit()

Но когда я использую Athena для запроса данных, при каждом запуске появляется 1 дополнительная дублирующаяся строка.Это почему?Я подозреваю, что запись в файл паркета будет всегда добавлять?Как я могу решить это?

1 Ответ

0 голосов
/ 02 декабря 2018

Ваш код удаляет дубликаты только из входных данных.Однако, если вы не хотите, чтобы он находился в месте назначения, вам нужно загрузить эти существующие данные, а затем записать только новые записи:

existingGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://..."], "recurse": True}, format = "parquet", transformation_ctx="existingGDF")

newOnlyDF = filteredDF.alias("new")
  .join(existingDf.alias("existing"), col("ID"), "left_outer")
  .where(col("existing.ID").isNull())
  .select("new.*")

outputGDF = glueContext.write_dynamic_frame.from_options(frame = newOnlyDF, connection_type = "s3", connection_options = {"path": "s3://..."}, format = "parquet", transformation_ctx = "outputGDF")
...