Используя pyspark, мы создали эту структуру как паркетный раздел (с хранилищем s3)
- entity=entity
- data_version=1.0
- hub_transaction_date=2020-04-01
Мы делаем некоторые вещи, которые должны обновляться со временем, в данном случае , раздел hub_transaction_date будет загружен снова (только необходимые даты), и я снова напишу, используя опцию перезаписи, и на этом этапе возникают некоторые проблемы.
Иногда, просто иногда, когда эта операция завершается, раздел дублируется безо всякой причины, и когда я пытаюсь снова прочитать раздел, я получаю эту ошибку:
: java.lang.AssertionError: assertion failed: Conflicting partition column names detected:
Partition column name list #0: hub_transaction_date
Partition column name list #1: hub_transaction_date, hub_transaction_date
Я установил опцию как «dynamici c» на sparkconfig. И не существует, и поле дублируется.
Код выглядит следующим образом:
spark = SparkSession \
.builder \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.config("spark.sql.shuffle.partitions", 800) \
.config("spark.default.parallelism", 800) \
.appName(APPLICATION_NAME) \
.getOrCreate()
path_update = "path"
schema = StructType([])
df_update = spark.read.schema(schema).parquet(path_update)
if df_update.count() > 0:
df_update = df_update.withColumn("hub_transaction_date", df_delete["my_date"].cast("date"))
df_distinct = df_update.select(df_update.hub_transaction_date).distinct()
partitions = []
# here i read only the parittion that needs to be update
for row in df_distinct.rdd.collect():
hub_transaction = row.hub_transaction_date.strftime("%Y-%m-%d")
partitions.append(ORIGINAL_PATH + "hub_transaction_date=" + hub_transaction)
df_updated = spark.read.schema(schema) \
.option("basePath", ORIGINAL_PATH) \
.parquet(*partitions)
// do some magic stuff
df_updated.repartition("hub_transaction_date") \
.write.partitionBy("hub_transaction_date").mode("overwrite") \
.parquet(ORIGINAL_PATH)
Примечание:
Я напечатал структуру схемы, и поле не дублируется.