Согласитесь с Владимиром, подумайте о добавлении дополнительных очков.
см. MapStatus установить spark.sql.shuffle.partitions
в 2001
( старый подход ) (по умолчанию 200).
новый подход (spark.shuffle.minNumPartitionsToHighlyCompress
), как Владимир сказал в ответе.
Почему это изменение? : MapStatus имеет 2000 жестко запрограммированных SPARK-24519
будет применяться другой алгоритм для обработки
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
HighlyCompressedMapStatus
:
Реализация MapStatus, которая хранит точный размер огромного
блоки, которые больше, чем spark.shuffle.accurateBlockThreshold.
Он хранит средний размер других непустых блоков, плюс растровое изображение
для отслеживания, какие блоки пусты.
spark. Это помогает предотвратить ООМ, избегая недооценки размера блока случайного воспроизведения при извлечении блоков случайного воспроизведения.
CompressedMapStatus
:
Реализация MapStatus, которая отслеживает размер каждого блока. Размер
для каждого блока представлен один байт.
Также установите на spark-submit
--conf spark.yarn.executor.memoryOverhead=<10% of executor memory> -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true
в обоих случаях Сжатие будет использовать spark.io.compression.codec
Заключение : большие задачи должны использовать HighlyCompressedMapStatus
, и накладные расходы памяти исполнителя могут составлять 10 процентов от объема памяти вашего исполнителя.
Далее, взгляните на настройка памяти памяти