Оптимизация производительности Pyspark для соответствия Pandas / Dask? - PullRequest
0 голосов
/ 04 сентября 2018

У меня есть данные еженедельных временных рядов, и я пытаюсь использовать Pyspark SQL для вычисления для нескольких столбцов еженедельной суммы за последние 8 недель. Я пытался использовать оконные функции Pyspark; в частности:

sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))

Но этот код выполняется очень медленно (30-60 секунд на столбец для 1000 уникальных идентификаторов и 170 временных шагов). Из других вопросов StackOverflow я понимаю, что разбиения и перестановки могут вызывать проблемы с производительностью, поэтому для лучшего понимания этих проблем я вручную вычисляю 8 самых последних значений за неделю для каждой недели в 8 столбцах, а затем добавлю эти столбцы, чтобы получить последние 8 недель. 'sum.

Вот упрощенный набор данных, который я создал:

idCount = 2
tCount = 10

df = pd.DataFrame({'customerId': [x for x in range(idCount) for y in range(tCount)],
               't': [y for x in range(idCount) for y in range(tCount)],
               'vals': range(idCount * tCount)})[['customerId', 't', 'vals']]

, который создает этот фрейм данных:

Входной кадр данных

   customerId   t   vals
0           0   0   0
1           0   1   1
2           0   2   2
3           0   3   3
4           0   4   4
5           0   5   5
6           0   6   6
7           0   7   7
8           0   8   8
9           0   9   9
10          1   0   10
11          1   1   11
12          1   2   12
13          1   3   13
14          1   4   14
15          1   5   15
16          1   6   16
17          1   7   17
18          1   8   18
19          1   9   19

Моя цель - 8 еженедельных отстающих столбцов «vals», включая vals_0 в качестве значения текущей недели, с NaN, где данные недоступны:

Кадр выходных данных цели

    customerId  t  vals_0  vals_1  vals_2  vals_3  vals_4  vals_5  vals_6  vals_7
0            0  0       0     NaN     NaN     NaN     NaN     NaN     NaN     NaN
1            0  1       1     0.0     NaN     NaN     NaN     NaN     NaN     NaN
2            0  2       2     1.0     0.0     NaN     NaN     NaN     NaN     NaN
3            0  3       3     2.0     1.0     0.0     NaN     NaN     NaN     NaN
4            0  4       4     3.0     2.0     1.0     0.0     NaN     NaN     NaN
5            0  5       5     4.0     3.0     2.0     1.0     0.0     NaN     NaN
6            0  6       6     5.0     4.0     3.0     2.0     1.0     0.0     NaN
7            0  7       7     6.0     5.0     4.0     3.0     2.0     1.0     0.0
8            0  8       8     7.0     6.0     5.0     4.0     3.0     2.0     1.0
9            0  9       9     8.0     7.0     6.0     5.0     4.0     3.0     2.0
10           1  0      10     NaN     NaN     NaN     NaN     NaN     NaN     NaN
11           1  1      11    10.0     NaN     NaN     NaN     NaN     NaN     NaN
12           1  2      12    11.0    10.0     NaN     NaN     NaN     NaN     NaN
13           1  3      13    12.0    11.0    10.0     NaN     NaN     NaN     NaN
14           1  4      14    13.0    12.0    11.0    10.0     NaN     NaN     NaN
15           1  5      15    14.0    13.0    12.0    11.0    10.0     NaN     NaN
16           1  6      16    15.0    14.0    13.0    12.0    11.0    10.0     NaN
17           1  7      17    16.0    15.0    14.0    13.0    12.0    11.0    10.0
18           1  8      18    17.0    16.0    15.0    14.0    13.0    12.0    11.0
19           1  9      19    18.0    17.0    16.0    15.0    14.0    13.0    12.0

Следующая функция Pandas создает целевой кадр данных:

def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
    newdf = df[[partCol, timeCol, lagCol]]
    for x in range(numLags):
        newCol = '{}_{}'.format(lagCol, x)
        joindf = newdf[[partCol, timeCol, lagCol]]
        joindf[timeCol] = newdf[timeCol] + x
        joindf = joindf.rename(columns = {lagCol: newCol})
        newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
    return newdf.drop(lagCol, axis = 1)

и работает примерно через 500 мс:

>>> %timeit print('pandas result: \n{}\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop

Я также могу выполнить это в Dask, используя map_partitions() и получить те же результаты через ~ 900 мс (предположительно хуже, чем у Pandas из-за накладных расходов при вращении потока):

>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n{}\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
                                                    'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop

Я также могу выполнить это в Pyspark ( Примечание: для Dask и Spark у меня есть только один раздел, чтобы сделать более справедливое сравнение с Pandas ):

>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId|  t|vals|
+----------+---+----+
|         0|  0|   0|
|         0|  1|   1|
|         0|  2|   2|
|         0|  3|   3|
|         0|  4|   4|
|         0|  5|   5|
|         0|  6|   6|
|         0|  7|   7|
|         0|  8|   8|
|         0|  9|   9|
|         1|  0|  10|
|         1|  1|  11|
|         1|  2|  12|
|         1|  3|  13|
|         1|  4|  14|
|         1|  5|  15|
|         1|  6|  16|
|         1|  7|  17|
|         1|  8|  18|
|         1|  9|  19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1

со следующим кодом:

def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
    newdf = df.select(df[partCol], df[timeCol], df[lagCol])
    for x in range(numLags):
        newCol = '{}_{}'.format(lagCol, x)
        joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
                                     .drop(timeCol).withColumnRenamed('newIdx', timeCol) \
                                     .withColumnRenamed(lagCol, newCol)
        newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
    newdf = newdf.drop(lagCol)
    return newdf

Я получаю верные результаты (хотя и перемешанный):

+----------+---+------+------+------+------+------+------+------+------+
|customerId|  t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
|         1|  3|    13|    12|    11|    10|  null|  null|  null|  null|
|         1|  0|    10|  null|  null|  null|  null|  null|  null|  null|
|         1|  1|    11|    10|  null|  null|  null|  null|  null|  null|
|         0|  9|     9|     8|     7|     6|     5|     4|     3|     2|
|         0|  1|     1|     0|  null|  null|  null|  null|  null|  null|
|         1|  4|    14|    13|    12|    11|    10|  null|  null|  null|
|         0|  4|     4|     3|     2|     1|     0|  null|  null|  null|
|         0|  3|     3|     2|     1|     0|  null|  null|  null|  null|
|         0|  7|     7|     6|     5|     4|     3|     2|     1|     0|
|         1|  5|    15|    14|    13|    12|    11|    10|  null|  null|
|         1|  6|    16|    15|    14|    13|    12|    11|    10|  null|
|         0|  6|     6|     5|     4|     3|     2|     1|     0|  null|
|         1|  7|    17|    16|    15|    14|    13|    12|    11|    10|
|         0|  8|     8|     7|     6|     5|     4|     3|     2|     1|
|         0|  0|     0|  null|  null|  null|  null|  null|  null|  null|
|         0|  2|     2|     1|     0|  null|  null|  null|  null|  null|
|         1|  2|    12|    11|    10|  null|  null|  null|  null|  null|
|         1|  9|    19|    18|    17|    16|    15|    14|    13|    12|
|         0|  5|     5|     4|     3|     2|     1|     0|  null|  null|
|         1|  8|    18|    17|    16|    15|    14|    13|    12|    11|
+----------+---+------+------+------+------+------+------+------+------+

но версия Pyspark занимает значительно дольше для запуска (34 секунды):

>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop

Я держал этот пример небольшим и простым (всего 20 строк данных, только 1 раздел для Dask и Spark), поэтому я не ожидал, что использование памяти и ЦП приведет к значительным различиям в производительности.

Мой вопрос: Есть ли способ лучше настроить Pyspark или оптимизировать выполнение Pyspark для этой конкретной задачи, чтобы приблизить Pyspark к Pandas и Dask по скорости (т. Е. 0,5-1,0 секунды)?

1 Ответ

0 голосов
/ 05 сентября 2018

pyspark является медленным по определению, так как Spark написан на Scala, и любая программа pyspark включает в себя запуск как минимум 1 JVM (обычно 1 драйвер и несколько рабочих) и программ на python (1 на одного работника) и обмен данными между ними. Объем межпроцессного взаимодействия между Java и Python зависит от кода Python, который вы используете.

Даже без всякой межъязыковой шумихи у Spark много накладных расходов, направленных на обработку распределенной обработки больших данных - это означает, что программы Spark, как правило, работают медленнее, чем любые нераспределенные решения ... до тех пор, пока масштаб маленький. Искра и pyspark специально созданы для больших масштабов, и вот где он сияет

...