Производительность Spark: локально быстрее, чем кластер (очень неравномерная нагрузка на исполнителя) - PullRequest
0 голосов
/ 03 октября 2018

позвольте мне начать с того, что я относительно новичок в зажигании, поэтому, если я говорю что-то, что не имеет смысла, просто исправьте меня.

Обобщая проблему, не важно, что я делаюна определенных этапах один исполнитель выполняет все вычисления, что делает выполнение кластера более медленным, чем локальное, однопроцессорное выполнение.

Полная версия: я написал приложение spark 1.6, которое состоит из серии карт, фильтров,соединения и короткая часть graphx.Приложение использует только один источник данных - файл CSV.В целях разработки я создал макетный набор данных, состоящий из 100 000 строк, 7 МБ, со всеми полями, имеющими случайные данные с равномерным распределением (случайная сортировка также в файле).Объединения являются внутренними соединениями в PairRDD в различных областях (набор данных имеет дублирующиеся ключи с ~ 200 дубликатами на ключ, имитирующие реальные данные), что приводит к декартовому произведению внутри ключа.Затем я выполняю ряд операций с картами и фильтрами для результатов объединений, сохраняю их как RDD для некоторых объектов пользовательского класса и сохраняю все в виде графиков в.

Я разработал код на своем ноутбукеи запустите его, что заняло около 5 минут (Windows-машина, локальный файл).К моему удивлению, когда я развернул jar на кластере (основная пряжа, режим кластера, файл в csv в HDFS) и представил его, выполнение кода заняло 8 минут.Я провел тот же эксперимент с меньшими данными, и результаты были 40 секунд локально и 1,1 минуты в кластере.

Когда я смотрел на сервер истории, я видел, что 2 этапа особенно продолжительны (почти 4 минуты каждый), и на этих этапах есть одна задача, которая занимает> 90% времени.Я запускаю код несколько раз, и это всегда была одна и та же задача, которая занимала столько времени, даже если он каждый раз развертывался на разных узлах данных.

К моему удивлению, когда я открывал исполнителей, я видел этуexecutor выполняет почти всю работу (с точки зрения затраченного времени) и выполняет большинство заданий.На приведенном скриншоте у второго наиболее «активного» исполнителя было 50 заданий, но это не всегда так - в другом представлении у второго наиболее занятого исполнителя было 15 задач, а у первого - 95).Executors

Более того, я видел, что для вычислений используется время 3,9 минуты (второй снимок экрана), которое наиболее сильно влияет на объединенные данные вскоре после отображения.Я думал, что данные не могут быть разделены поровну и один исполнитель должен выполнить все вычисления.Поэтому я попытался очистить pairRdd вручную (используя .partitionBy (новый HashPartitioner (40))) прямо перед объединением (аналогичное время выполнения) или сразу после объединения (выполнение еще медленнее).enter image description here

В чем может быть проблема?Любая помощь будет оценена.

1 Ответ

0 голосов
/ 03 октября 2018

Трудно сказать, не видя ваших запросов и не понимая ваш набор данных. Я полагаю, вы не включили его или потому, что он очень сложный или чувствительный?Так что это немного выстрел в темноте, однако это выглядит как проблема, с которой мы столкнулись в моей команде на работе.Мое грубое предположение о том, что происходит, состоит в том, что во время одного из ваших соединений у вас есть ключевое пространство, которое имеет большую мощность, но очень неравномерное распределение.В нашем случае мы подключались к источникам веб-трафика, и хотя у нас есть тысячи возможных источников трафика, подавляющее большинство трафика исходит от нескольких.Это вызвало проблему, когда мы присоединились.Ключи будут равномерно распределены между исполнителями, однако, поскольку, возможно, 95% данных совместно используются, может быть, 3 или 4 ключа, очень небольшое количество исполнителей выполняли большую часть работы.Когда вы находите объединение, которое страдает от этого, нужно выбрать меньший из двух наборов данных и явно выполнить широковещательное соединение.(Обычно Spark будет пытаться это сделать, но не всегда точно сказать, когда это нужно.)

Для этого, скажем, у вас есть два DataFrames.В одном из них есть два столбца: number и stringRep, где число - всего одна строка для всех целых чисел из 0-10000, а stringRep - просто строковое представление этого значения, поэтому «один», «два», «три»"и т. д. Мы назовем это numToString

У другого DataFrame есть некоторый ключевой столбец для соединения с number в numToString, называемый kind, некоторыми другими не относящимися к делу данными и 100 000 000 строк.Мы назовем этот DataFrame ourData.Тогда давайте предположим, что распределение 100 000 000 строк в ourData составляет 90% с kind == 1, 5% с kind == 2, а оставшиеся 5% довольно равномерно распределены между оставшимися 99,998 числами.Когда вы выполняете следующий код:

val numToString: DataFrame = loadNumToString()
val ourData: DataFrame = loadOurCode()

val joined = ourData.join(numToString).where(ourData("kind") === numToString("number"))

... очень вероятно, что Spark отправит% 90 данных (которые имеют kind == 1) одному исполнителю,% 5 данных (то, что имеет kind == 2) другому исполнителю, а оставшиеся% 5 размазаны по остальным, оставляя двух исполнителей с огромными разделами, а остальные с очень маленькими.

Способ, как я упоминал ранее,явно выполнить широковещательное соединение.Для этого нужно взять один DataFrame и полностью распределить его по каждому узлу.Таким образом, вы должны сделать это вместо этого:

val joined = ourData.join(broadcast(numToString)).where(ourData("kind") === numToString("number"))

... что бы отправить numToString каждому исполнителю.Предполагая, что ourData был предварительно равномерно разделен, данные должны оставаться равномерно распределенными по исполнителям.Это может быть не вашей проблемой, но это действительно похоже на проблему, с которой мы столкнулись.Надеюсь, это поможет!

Более подробную информацию о трансляциях можно найти здесь: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...