можно ли перекрывать разделы при написании паркетных файлов - PullRequest
0 голосов
/ 12 июля 2020

У меня очень большой фрейм данных размером около 2 ТБ. Есть 2 столбца, по которым я могу их разделить: MODULE и DATE Если я разделю их на MODULE, каждый модуль может иметь одинаковые даты, например, MODULE A может иметь даты 2020-07-01 , 2020-07-02 и MODULE B могут иметь 2020-07-01 , 2020-07-05 и c. Мне нужно сначала разделить их по MODULE и выполнить несколько агрегатов и объединений, прежде чем я окончательно разделю и сохраню их по DATE. Я использую pyspark для кодирования.

После выполнения агрегации и присоединения с помощью MODULE я добавляю его в файл паркета и загружаю весь файл паркета во фрейм данных, а затем разбиваю его на разделы до ДАТЫ. Проблема в том, что искровое задание завершается из-за проблем с памятью. Могу ли я разделить по дате прямо в разделе MODULE? Таким образом, раздел будет выглядеть примерно так: Формат ввода: s3://path/MODULE=A --> s3://path/DATE=2020-07-01 где оба модуля A и B присутствуют в разделе DATE=2020-07-01?

Это был мой исходный код, который не удался из-за очень длинного раз в кластере и не хватает памяти:

inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)

#Load Data by Module
for uvapath in uvaDirs:
    customPath='s3://' + uvapath + '/'
    df1=spark.read.parquet(customPath)
    #Perform aggregations and joins
    df1.write.mode('append').parquet(outpath)
    
# Load - partition by date
df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")

Он успешно создает file_tmp.parquet, но не работает при загрузке и разбиении по дате. Любая помощь будет принята с благодарностью! Спасибо

1 Ответ

0 голосов
/ 12 июля 2020

как источник данных дельты может сделать это, сохранить дельта как паркет

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))

// clean old file
val deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.vacuum(0)

ссылка: https://docs.delta.io/latest/best-practices.html# -delta-compact-files & language- python

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...