Рассмотрим общий алгоритм:
val first : DataFrame = ... // about 100 Mb
val second : DataFrame = ... // about 5 GMb
val third : DataFrame = ... // about 7 GMb
val fourth : DataFrame = ... // about 13 GMb
//all dataframe are filtered, renamed all columns. A new colun is added into `third` and `fourth`
val = firstAndSecond = first.join(first("first_id") === second("second_id"))
val = thirdsAndAll = firstAndSecond.join(firstAndSecond("some_id") === third("third_id"))
val = fourthAndAll = thirdsAndAll.join(thirdsAndAll("other_id") === fourth("fourth_id"))
fourthAndAll.write.mode(saveMode = SaveMode.Overwrite).parquet("file://C:path")
Примечания *
- Все кадры данных считываются и записываются с SSD-диска.
- Операция чтения и записи, выполняемая в / в файлах паркета
- Программа была запущена на Threadripper с 8 ядрами (16 виртуальных), 80 ГБ ОЗУ (искра потребляла около 25 ГБ), также 99% времени (кроме ситуации, когдапоследний файл записывается) все 16 ядер загружены на 100%.
Проблема
У меня очень разные размеры в выходных файлах паркета от 100 кб до 500 нб,Также большие файлы имеют очень много времени для записи. Например, каждый файл записывается некоторыми потоками, которые записывают файлы размером 500 МБ, 450 МБ (и т. Д.), Выполняющие свою работу слишком долго. (для 500Мб это было около 8 часов)
Есть мысли, как настроить спарк для записи файла паркета с более или менее равным размером загрузки процессора?