У меня очень большой фрейм данных размером около 2 ТБ. Есть 2 столбца, по которым я могу их разделить: MODULE
и DATE
Если я разделю их на MODULE
, каждый модуль может иметь одинаковые даты, например, MODULE A
может иметь даты 2020-07-01 , 2020-07-02
и MODULE B
могут иметь 2020-07-01 , 2020-07-05
и c. Мне нужно сначала разделить их по MODULE
и выполнить несколько агрегатов и объединений, прежде чем я окончательно разделю и сохраню их по DATE
. Я использую pyspark для кодирования.
После выполнения агрегации и присоединения с помощью MODULE я добавляю его в файл паркета и загружаю весь файл паркета во фрейм данных, а затем разбиваю его на разделы до ДАТЫ. Проблема в том, что искровое задание завершается из-за проблем с памятью. Могу ли я разделить по дате прямо в разделе MODULE
? Таким образом, раздел будет выглядеть примерно так: Формат ввода: s3://path/MODULE=A --> s3://path/DATE=2020-07-01
где оба модуля A
и B
присутствуют в разделе DATE=2020-07-01
?
Это был мой исходный код, который не удался из-за очень длинного раз в кластере и не хватает памяти:
inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)
#Load Data by Module
for uvapath in uvaDirs:
customPath='s3://' + uvapath + '/'
df1=spark.read.parquet(customPath)
#Perform aggregations and joins
df1.write.mode('append').parquet(outpath)
# Load - partition by date
df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")
Он успешно создает file_tmp.parquet
, но не работает при загрузке и разбиении по дате. Любая помощь будет принята с благодарностью! Спасибо