можно ли устранить необходимость в перетасовке искры, используя локальные переменные в исполнителе и аккумуляторы в драйвере - PullRequest
0 голосов
/ 07 августа 2020

Вот рабочий процесс, который мы реализовали в Spark (Scala).

1 - Чтение данных из Cassandra, создание DataSet уникальных идентификаторов с соответствующими значениями атрибутов float.

2 - преобразовать этот набор данных в DataFrame и кэшировать его в Spark. Это целевой фрейм данных, который должен использоваться для каждого входящего запроса «вычисление оценки соответствия».

3 - для каждого запроса рассчитайте оценку соответствия между входящим значением атрибута и целевым значением атрибута каждой строки в целевом фрейме данных. using a scala UDF. Создайте новый DataFrame, который включает уникальные идентификаторы с оценками совпадений.

4- Отфильтруйте, отсортируйте и соберите 3 лучших результата совпадений из этого второго DataFrame.

Когда я отправляю задание, оно занимает около 3 секунды для завершения для целевого DataFrame из 1 миллиона строк.

Я заменяю шаг (4) выше простым действием count () и позволяю Spark просто вычислить второй DataFrame с оценками совпадений и произвести подсчет ( ). Это занимает около 0,2 секунды.

Кроме того, когда я проверяю эту упрощенную работу, я замечаю, что только 75 мс c тратится на создание второго DataFrame, в то время как около 85 мс c тратится на подсчет ( ) -если это имеет смысл.

Итак, я прихожу к выводу, что мне нужно найти способ избавиться от той части, которая требует перетасовки (4).

Я думал о ниже механизм и хотел бы знать, возможно ли это технически:

1 - То же, что указано выше

2 - Также то же

3 - То же самое, но с добавленной логикой / механизм, как показано ниже:

  • Создайте аккумулятор для хранения list пар (оценка совпадения, уникальный идентификатор)
  • Создайте локальную переменную scala. При вычислении оценки соответствия для каждой строки целевого фрейма данных и вычисления второго фрейма данных обновите эту локальную переменную, указав наивысшую вычисленную оценку on that executor. Таким образом, каждый исполнитель должен иметь свой собственный высший расчетный балл (вместе с уникальным идентификатором), хранящийся в этой переменной.
  • Добавить значение, хранящееся в этой локальной переменной исполнителя, в аккумулятор. Чтобы в конце каждого задания у нас был список (score,ID) кортежей в драйвере.
  • Вернуть 3 верхних элемента из списка (аккумулятор)

4 - Не run

Я думаю, если этот механизм МОЖЕТ быть реализован, мы могли бы получить огромное увеличение производительности с 3 секунд до примерно 0,1 секунды.

Как вы думаете, это технически возможно? Могу ли я создать локальную переменную scala, которая будет обновляться в каждом исполнителе отдельно во время выполнения задания и будет сброшена в начале следующего / следующего задания вычисления совпадения?

Как насчет аккумулятора? Будет ли он автоматически сбрасываться в конце каждого задания? или мне следует программно сбросить эти локальные переменные и аккумуляторы?

Большое спасибо.

...