Приложение My (Py) Spark 2.1.1 состоит из двух исполнителей с 5 ядрами и кучей 30G (spark.executor.memory
) каждый.У меня есть 3,2 ГБ данных, сохраненных в памяти (десериализованных), распределенных по дюжине разделов и распределенных между моими двумя исполнителями (1,9 ГБ + 1,3 ГБ).Затем я хочу перераспределить эти данные, вызвав repartition('myCol')
на моем постоянном фрейме данных с myCol
, имеющим только три ключа с распределением 60-20-20.Затем я хочу записать перераспределенные данные в файлы (3) .parquet.Как и ожидалось, это преобразование вызывает полное перемешивание данных:
- Первый вопрос: в пользовательском интерфейсе Spark произвольная запись составляет 5,9 ГБ.Почему этот объем намного превышает размер сохраняемых данных?Это формат, который использует Spark для записи случайных файлов на диск (текстовые строки?)?Репликация?
- Второй вопрос: Мои исполнители умирают с сообщениями об ошибках, такими как
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
или ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 32.0 GB of 32 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
.spark.yarn.executor.memoryOverhead
уже установлен на 2g
, но я должен признаться, что я не совсем понимаю, как этот параметр должен помочь в этом контексте.Но главный вопрос: как перетасовать 3Gb данных может OOM исполнитель 30Gb?
Я изменил несколько параметров из моего понимания Spark (очевидно, с ограниченным успехом): я установил spark.memory.fraction
на 0.9
и spark.memory.storageFraction
на 0.0
.
Заранее большое спасибо за любую помощь, эта ситуация очень расстраивает.
PS: Может быть, когда проблема будет решена, я могу изменить дизайнмое приложение с меньшим количеством памяти на исполнителя.В настоящее время мне кажется ужасной тратой ресурсов.