Когда на самом деле реализуются данные в искровом разделе? - PullRequest
1 голос
/ 20 января 2020

Я анализирую производительность моего искрового приложения в случае небольших наборов данных. У меня есть график линий, который выглядит примерно так:

someList.toDS()
.repartition(x)
.mapPartitions(func1)
.mapPartitions(func2)
.mapPartitions(func3)
.filter(cond1)
.count()

У меня есть кластер из 2 узлов с 8 ядрами на каждом. Исполнители настроены на использование 4 ядер. Итак, когда приложение запущено, четыре исполнителя работают с четырьмя ядрами каждое.

Я наблюдаю как минимум (и обычно только) 1 задачу в каждом потоке (т.е. всего 16 задач), которая занимает намного больше времени, чем другие. задания. Например, в одном запуске эти задачи занимают приблизительно 15-20 секунд по сравнению с другими задачами, выполняющимися в секунду или менее.

При профилировании кода я обнаружил, что узкое место в func3 выше:

def func3 = (partition: Iterator[DomainObject]) => {
  val l = partition.toList          // This takes almost all of the time
  val t = doSomething(l)
}

Преобразование из Итератора в Список занимает почти все время.

Размер раздела очень мал (в некоторых случаях даже меньше 50). Даже тогда размер раздела практически одинаков для разных разделов, но только одна задача на поток занимает время.

Я бы предположил, что к тому времени, когда func3 запускается на исполнителе для задачи, данные в этом разделе уже будут присутствовать на исполнителе. Разве это не так? (Он выполняет итерацию по всему набору данных, чтобы как-то отфильтровать данные для этого раздела во время выполнения func3?!)

Иначе, почему преобразование из Iterator более чем пятидесяти объектов в List занимают столько времени?

Еще одна вещь, которую я отмечаю (не уверен, что это актуально) - это время G C (в соответствии с искровым интерфейсом) для этих задач также необычно согласовано 2 с для всех эти шестнадцать заданий по сравнению с другими заданиями (даже тогда, 2 с << 20 с) </p>

Обновление: Ниже показано, как выглядит временная шкала для четырех исполнителей: spark ui : event timeline for the stage

Ответы [ 2 ]

1 голос
/ 20 января 2020
  1. Первая реализация происходит во время repartition()
  2. Вторая - после операции filter, когда все три mapPartitions начинают выполнение (когда вызывается действие count). В зависимости от вашего doSomething() в каждой из этих функций, это будет зависеть от того, как будет создана группа обеспечения доступности баз данных и где это займет время, и, соответственно, вы сможете оптимизировать.
0 голосов
/ 21 января 2020

Похоже, что данные в разделе становятся доступными, как только задача начинает выполняться (или, по крайней мере, нет существенных затрат на итерацию этих данных, как может показаться из вопроса.)

Узкое место в приведенном выше коде на самом деле в func2 (что я не исследовал должным образом!), И из-за ленивого характера итераторов в scala. Проблема вообще не связана с искрой.

Во-первых, функции в вызовах mapPartitions выше появляются , чтобы получить цепочку и вызвать так: func3( func2( func1(Iterator[A]) ) ) : Iterator[B]. Таким образом, Iterator, полученный как выход func2, подается непосредственно на func3.

Во-вторых, для вышеприведенного выпуска func1func2) определяются как:

func1(x: Iterator[A]) => Iterator[B] = x.map(...).filter...

Поскольку они берут итератор и сопоставляют их с другим итератором, они не выполняются сразу. Но когда func3 выполняется, partition.toList заставляет замыкание карты в func2 быть выполненным. При профилировании кажется, что func3 заняло все время, где вместо func2 есть код, замедляющий работу приложения.

(Спецификация c к вышеуказанной проблеме, func2 содержит некоторую сериализацию объектов case в строку json. Похоже, что для выполнения неявного кода требуется много времени, только для первого объекта в каждом потоке. Так как это происходит один раз для каждого потока, каждый поток имеет только одну задачу, которая занимает очень много времени, и объясняет событие график выше.)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...