В Spark 2.3 я запускаю следующий код:
rdd
.persist(DISK_ONLY) // this is 3GB according to storage tab
.groupBy(_.key)
.mapValues(iter => iter.map(x => CaseClass(x._1, x._2)))
.mapValues(iter => func(iter))
- У меня есть SQL-кадр данных из 300M строк
- Я преобразую его в RDD, а затем сохраню его:вкладка хранилища показывает, что это 3GB
- Я делаю groupBy.Один из моих ключей - получение 100 млн. Элементов, то есть примерно 1 ГБ, если я выберу размер СДР
- . Я сопоставляю каждый элемент после перемешивания с классом дел.Этот класс case имеет только 2 «двойных» поля
- Я отправляю полный итератор, содержащий все данные раздела, в функцию, которая обработает этот поток
Я наблюдаючто задача, которая обрабатывает 100M класса case, всегда терпит неудачу после 1 часа + обработки.На вкладке «Агрегированные показатели по исполнителю» в пользовательском интерфейсе я вижу ОГРОМНЫЕ значения для столбца «случайного разлива», около 10 ГБ, , что в 3 раза больше, чем размер полного СДР. .Когда я выполняю дамп потока медленного исполнителя, кажется, что он застрял в операциях записи / чтения на диск.
Может кто-нибудь сказать мне, что происходит?Я понимаю, что 100M экземпляров класса case, вероятно, слишком велики, чтобы поместиться в ОЗУ одного исполнителя, но я не понимаю следующее:
1) Разве Spark не должен "передавать" все экземпляры вмоя func
функция?почему он пытается сохранить все данные при получении узла-исполнителя?
2) Откуда происходит увеличение объема памяти?Я не понимаю, почему сериализация 100M экземпляров класса case должна занимать около 10 ГБ, что составляет примерно 100 байт на элемент (если предположить, что данные, которые выливаются на диск, являются экземплярами CaseClass, я не уверен из своей работы, в какой момент данныепролито)