У меня есть искровое задание, которое объединяет 2 набора данных, выполняет некоторые преобразования и сокращает данные для выдачи. Размер входного файла на данный момент довольно мал (200 МБ каждый набор данных), но после объединения, как вы можете видеть в DAG, задание застревает и никогда не переходит к этапу 4. Я пытался ждать часами, и он выдал OOM и показал неудачные задачи для этапа 4.
- Почему Spark не показывает этап 4 (этап преобразования данных) активным после этапа 3 (этап присоединения)? Он застрял в тасовке между этапами 3 и 4?
- Что я могу сделать, чтобы улучшить производительность моей искровой работы? Я попытался увеличить количество разделов в случайном порядке и все еще тот же результат.
Код задания:
joinedDataset.groupBy("group_field")
.agg(collect_set("name").as("names")).select("names").as[List[String]]
.rdd. //converting to rdd since I need to use reduceByKey
.flatMap(entry => generatePairs(entry)) // this line generates pairs of words out of input text, so data size increases here
.map(pair => ((pair._1, pair._2), 1))
.reduceByKey(_+_)
.sortBy(entry => entry._2, ascending = false)
.coalesce(1)
К вашему сведению В моем кластере 3 рабочих узла с 16 ядрами и 100 ГБ ОЗУ, 3 исполнителя с 16 ядрами (Соотношение 1: 1 с машинами для простоты) и 64 ГБ памяти.
ОБНОВЛЕНИЕ: Оказывается, данные, сгенерированные в моей работе, довольно огромны. Я сделал несколько оптимизаций (стратегически сократил входные данные и удалил несколько дублированных строк из обработки), теперь работа завершается в течение 3 часов. Вход для этапа 4 составляет 200 МБ, а выход - 200 ГБ как таковой. Он использует параллелизм должным образом, но он сосет в случайном порядке. Мой случайный разлив во время этой работы составил 1825 ГБ (память) и 181 ГБ (диск). Может ли кто-нибудь помочь мне с уменьшением количества разливов и продолжительности работы? Спасибо.