Вот рабочий процесс, который мы реализовали в Spark (Scala).
1 - Чтение данных из Cassandra, создание DataSet уникальных идентификаторов с соответствующими значениями атрибутов float
.
2 - преобразовать этот набор данных в DataFrame и кэшировать его в Spark. Это целевой фрейм данных, который должен использоваться для каждого входящего запроса «вычисление оценки соответствия».
3 - для каждого запроса рассчитайте оценку соответствия между входящим значением атрибута и целевым значением атрибута каждой строки в целевом фрейме данных. using a scala UDF
. Создайте новый DataFrame, который включает уникальные идентификаторы с оценками совпадений.
4- Отфильтруйте, отсортируйте и соберите 3 лучших результата совпадений из этого второго DataFrame.
Когда я отправляю задание, оно занимает около 3 секунды для завершения для целевого DataFrame из 1 миллиона строк.
Я заменяю шаг (4) выше простым действием count () и позволяю Spark просто вычислить второй DataFrame с оценками совпадений и произвести подсчет ( ). Это занимает около 0,2 секунды.
Кроме того, когда я проверяю эту упрощенную работу, я замечаю, что только 75 мс c тратится на создание второго DataFrame, в то время как около 85 мс c тратится на подсчет ( ) -если это имеет смысл.
Итак, я прихожу к выводу, что мне нужно найти способ избавиться от той части, которая требует перетасовки (4).
Я думал о ниже механизм и хотел бы знать, возможно ли это технически:
1 - То же, что указано выше
2 - Также то же
3 - То же самое, но с добавленной логикой / механизм, как показано ниже:
- Создайте аккумулятор для хранения
list
пар (оценка совпадения, уникальный идентификатор) - Создайте локальную переменную scala. При вычислении оценки соответствия для каждой строки целевого фрейма данных и вычисления второго фрейма данных обновите эту локальную переменную, указав наивысшую вычисленную оценку
on that executor
. Таким образом, каждый исполнитель должен иметь свой собственный высший расчетный балл (вместе с уникальным идентификатором), хранящийся в этой переменной. - Добавить значение, хранящееся в этой локальной переменной исполнителя, в аккумулятор. Чтобы в конце каждого задания у нас был список
(score,ID)
кортежей в драйвере. - Вернуть 3 верхних элемента из списка (аккумулятор)
4 - Не run
Я думаю, если этот механизм МОЖЕТ быть реализован, мы могли бы получить огромное увеличение производительности с 3 секунд до примерно 0,1 секунды.
Как вы думаете, это технически возможно? Могу ли я создать локальную переменную scala, которая будет обновляться в каждом исполнителе отдельно во время выполнения задания и будет сброшена в начале следующего / следующего задания вычисления совпадения?
Как насчет аккумулятора? Будет ли он автоматически сбрасываться в конце каждого задания? или мне следует программно сбросить эти локальные переменные и аккумуляторы?
Большое спасибо.