pyspark - агрегирование с использованием RDD гораздо быстрее, чем DataFrames - PullRequest
0 голосов
/ 07 мая 2018

Я пытаюсь сделать простой поиск и агрегацию (больших) CSV из грамм Google. Для этого у меня есть широковещательная переменная patterns_set, которой принадлежат все ключи, которые я хочу найти, и затем я ищу их в df, pyspark.sql.DataFrame, созданном в формате databricks.csv. Поэтому я хочу сгруппировать по ngram (столбец 0), а затем суммировать по match_count (столбец 1).

Но есть разница в величине между вычислениями этой работы с RDD или с DataFrames, когда я пытаюсь локально (16 мс против 43 с). Не совсем уверен, что это происходит и в кластере, хотя - это ожидаемое ?

%%time
from operator import itemgetter, add
df.rdd.filter(lambda x: x[0] in patterns_set.value).keyBy(itemgetter(0))\
.mapValues(itemgetter(1))\
.mapValues(int)\
.reduceByKey(add)

И это занимает:

CPU times: user 7.04 ms, sys: 3.24 ms, total: 10.3 ms
Wall time: 16.7 ms

Но при попытке с фреймами данных:

%%time
df.filter(df.ngram.isin(patterns_set.value))\
  .groupby('ngram').sum('match_count')

Время стены намного больше

CPU times: user 6.78 s, sys: 1.54 s, total: 8.32 s
Wall time: 43.3 s

1 Ответ

0 голосов
/ 07 мая 2018

Ваш код не измеряет то, что вы думаете.

Первый фрагмент очень быстрый, потому что он почти ничего не делает. Преобразования RDD являются ленивыми, поэтому данные не затрагиваются вообще (или доступны только для вывода схемы, в зависимости от вышестоящего кода).

С тем, что вы показали, не представляется возможным, почему второй фрагмент медленный, но лучшая ставка - либо инициализация метастаза (если этот фрагмент был фактически выполнен первым), либо время, необходимое для вычисления плана выполнения (это может произойти, особенно с большое количество столбцов). То же, что и первый фрагмент, он (более или менее) ленив, поэтому данные на самом деле не обрабатываются.

...