Spark Job застрял между этапами после объединения - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть искровое задание, которое объединяет 2 набора данных, выполняет некоторые преобразования и сокращает данные для выдачи. Размер входного файла на данный момент довольно мал (200 МБ каждый набор данных), но после объединения, как вы можете видеть в DAG, задание застревает и никогда не переходит к этапу 4. Я пытался ждать часами, и он выдал OOM и показал неудачные задачи для этапа 4.

Spark UI DAG Task Summary

  1. Почему Spark не показывает этап 4 (этап преобразования данных) активным после этапа 3 (этап присоединения)? Он застрял в тасовке между этапами 3 и 4?
  2. Что я могу сделать, чтобы улучшить производительность моей искровой работы? Я попытался увеличить количество разделов в случайном порядке и все еще тот же результат.

Код задания:


joinedDataset.groupBy("group_field")
    .agg(collect_set("name").as("names")).select("names").as[List[String]]
      .rdd.                                     //converting to rdd since I need to use reduceByKey                                  
      .flatMap(entry => generatePairs(entry))   // this line generates pairs of words out of input text, so data size increases here
      .map(pair => ((pair._1, pair._2), 1))
      .reduceByKey(_+_)
      .sortBy(entry => entry._2, ascending = false)
      .coalesce(1)

К вашему сведению В моем кластере 3 рабочих узла с 16 ядрами и 100 ГБ ОЗУ, 3 исполнителя с 16 ядрами (Соотношение 1: 1 с машинами для простоты) и 64 ГБ памяти.

ОБНОВЛЕНИЕ: Оказывается, данные, сгенерированные в моей работе, довольно огромны. Я сделал несколько оптимизаций (стратегически сократил входные данные и удалил несколько дублированных строк из обработки), теперь работа завершается в течение 3 часов. Вход для этапа 4 составляет 200 МБ, а выход - 200 ГБ как таковой. Он использует параллелизм должным образом, но он сосет в случайном порядке. Мой случайный разлив во время этой работы составил 1825 ГБ (память) и 181 ГБ (диск). Может ли кто-нибудь помочь мне с уменьшением количества разливов и продолжительности работы? Спасибо.

1 Ответ

0 голосов
/ 28 апреля 2020

Попробуйте выполнить первоначальную сортировку по исполнителю, а затем уменьшите + сортируйте их

joinedDataset.groupBy("group_field")
    .agg(collect_set("name").as("names")).select("names").as[List[String]]
      .rdd.                                     //converting to rdd since I need to use reduceByKey                                  
      .flatMap(entry => generatePairs(entry))   // this line generates pairs of words out of input text, so data size increases here
      .map(pair => ((pair._1, pair._2), 1))
      .sortBy(entry => entry._2, ascending = false) // Do a initial sort on executors
      .reduceByKey(_+_)
      .sortBy(entry => entry._2, ascending = false) 
      .coalesce(1)
...