Дублирование паркета на S3 с использованием EMR и Apache Spark - PullRequest
0 голосов
/ 28 апреля 2020

Используя pyspark, мы создали эту структуру как паркетный раздел (с хранилищем s3)

 - entity=entity
    - data_version=1.0
       - hub_transaction_date=2020-04-01

Мы делаем некоторые вещи, которые должны обновляться со временем, в данном случае , раздел hub_transaction_date будет загружен снова (только необходимые даты), и я снова напишу, используя опцию перезаписи, и на этом этапе возникают некоторые проблемы.

Иногда, просто иногда, когда эта операция завершается, раздел дублируется безо всякой причины, и когда я пытаюсь снова прочитать раздел, я получаю эту ошибку:

: java.lang.AssertionError: assertion failed: Conflicting partition column names detected:

    Partition column name list #0: hub_transaction_date
    Partition column name list #1: hub_transaction_date, hub_transaction_date

Я установил опцию как «dynamici c» на sparkconfig. И не существует, и поле дублируется.

Код выглядит следующим образом:


spark = SparkSession \
        .builder \
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
        .config("spark.sql.shuffle.partitions", 800) \
        .config("spark.default.parallelism", 800) \
        .appName(APPLICATION_NAME) \
        .getOrCreate()

path_update = "path"

schema = StructType([])

df_update = spark.read.schema(schema).parquet(path_update)

if df_update.count() > 0:

    df_update   = df_update.withColumn("hub_transaction_date", df_delete["my_date"].cast("date"))
    df_distinct = df_update.select(df_update.hub_transaction_date).distinct()

    partitions = []

    # here i read only the parittion that needs to be update
    for row in df_distinct.rdd.collect():
            hub_transaction = row.hub_transaction_date.strftime("%Y-%m-%d")
            partitions.append(ORIGINAL_PATH + "hub_transaction_date=" + hub_transaction) 

    df_updated = spark.read.schema(schema) \
                .option("basePath", ORIGINAL_PATH) \
                .parquet(*partitions) 

    // do some magic stuff

    df_updated.repartition("hub_transaction_date") \
                .write.partitionBy("hub_transaction_date").mode("overwrite") \
                .parquet(ORIGINAL_PATH)     

Примечание:

Я напечатал структуру схемы, и поле не дублируется.

...