Я анализирую производительность моего искрового приложения в случае небольших наборов данных. У меня есть график линий, который выглядит примерно так:
someList.toDS()
.repartition(x)
.mapPartitions(func1)
.mapPartitions(func2)
.mapPartitions(func3)
.filter(cond1)
.count()
У меня есть кластер из 2 узлов с 8 ядрами на каждом. Исполнители настроены на использование 4 ядер. Итак, когда приложение запущено, четыре исполнителя работают с четырьмя ядрами каждое.
Я наблюдаю как минимум (и обычно только) 1 задачу в каждом потоке (т.е. всего 16 задач), которая занимает намного больше времени, чем другие. задания. Например, в одном запуске эти задачи занимают приблизительно 15-20 секунд по сравнению с другими задачами, выполняющимися в секунду или менее.
При профилировании кода я обнаружил, что узкое место в func3 выше:
def func3 = (partition: Iterator[DomainObject]) => {
val l = partition.toList // This takes almost all of the time
val t = doSomething(l)
}
Преобразование из Итератора в Список занимает почти все время.
Размер раздела очень мал (в некоторых случаях даже меньше 50). Даже тогда размер раздела практически одинаков для разных разделов, но только одна задача на поток занимает время.
Я бы предположил, что к тому времени, когда func3
запускается на исполнителе для задачи, данные в этом разделе уже будут присутствовать на исполнителе. Разве это не так? (Он выполняет итерацию по всему набору данных, чтобы как-то отфильтровать данные для этого раздела во время выполнения func3?!)
Иначе, почему преобразование из Iterator
более чем пятидесяти объектов в List
занимают столько времени?
Еще одна вещь, которую я отмечаю (не уверен, что это актуально) - это время G C (в соответствии с искровым интерфейсом) для этих задач также необычно согласовано 2 с для всех эти шестнадцать заданий по сравнению с другими заданиями (даже тогда, 2 с << 20 с) </p>
Обновление: Ниже показано, как выглядит временная шкала для четырех исполнителей: