У меня есть данные еженедельных временных рядов, и я пытаюсь использовать Pyspark SQL для вычисления для нескольких столбцов еженедельной суммы за последние 8 недель. Я пытался использовать оконные функции Pyspark; в частности:
sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))
Но этот код выполняется очень медленно (30-60 секунд на столбец для 1000 уникальных идентификаторов и 170 временных шагов). Из других вопросов StackOverflow я понимаю, что разбиения и перестановки могут вызывать проблемы с производительностью, поэтому для лучшего понимания этих проблем я вручную вычисляю 8 самых последних значений за неделю для каждой недели в 8 столбцах, а затем добавлю эти столбцы, чтобы получить последние 8 недель. 'sum.
Вот упрощенный набор данных, который я создал:
idCount = 2
tCount = 10
df = pd.DataFrame({'customerId': [x for x in range(idCount) for y in range(tCount)],
't': [y for x in range(idCount) for y in range(tCount)],
'vals': range(idCount * tCount)})[['customerId', 't', 'vals']]
, который создает этот фрейм данных:
Входной кадр данных
customerId t vals
0 0 0 0
1 0 1 1
2 0 2 2
3 0 3 3
4 0 4 4
5 0 5 5
6 0 6 6
7 0 7 7
8 0 8 8
9 0 9 9
10 1 0 10
11 1 1 11
12 1 2 12
13 1 3 13
14 1 4 14
15 1 5 15
16 1 6 16
17 1 7 17
18 1 8 18
19 1 9 19
Моя цель - 8 еженедельных отстающих столбцов «vals», включая vals_0 в качестве значения текущей недели, с NaN, где данные недоступны:
Кадр выходных данных цели
customerId t vals_0 vals_1 vals_2 vals_3 vals_4 vals_5 vals_6 vals_7
0 0 0 0 NaN NaN NaN NaN NaN NaN NaN
1 0 1 1 0.0 NaN NaN NaN NaN NaN NaN
2 0 2 2 1.0 0.0 NaN NaN NaN NaN NaN
3 0 3 3 2.0 1.0 0.0 NaN NaN NaN NaN
4 0 4 4 3.0 2.0 1.0 0.0 NaN NaN NaN
5 0 5 5 4.0 3.0 2.0 1.0 0.0 NaN NaN
6 0 6 6 5.0 4.0 3.0 2.0 1.0 0.0 NaN
7 0 7 7 6.0 5.0 4.0 3.0 2.0 1.0 0.0
8 0 8 8 7.0 6.0 5.0 4.0 3.0 2.0 1.0
9 0 9 9 8.0 7.0 6.0 5.0 4.0 3.0 2.0
10 1 0 10 NaN NaN NaN NaN NaN NaN NaN
11 1 1 11 10.0 NaN NaN NaN NaN NaN NaN
12 1 2 12 11.0 10.0 NaN NaN NaN NaN NaN
13 1 3 13 12.0 11.0 10.0 NaN NaN NaN NaN
14 1 4 14 13.0 12.0 11.0 10.0 NaN NaN NaN
15 1 5 15 14.0 13.0 12.0 11.0 10.0 NaN NaN
16 1 6 16 15.0 14.0 13.0 12.0 11.0 10.0 NaN
17 1 7 17 16.0 15.0 14.0 13.0 12.0 11.0 10.0
18 1 8 18 17.0 16.0 15.0 14.0 13.0 12.0 11.0
19 1 9 19 18.0 17.0 16.0 15.0 14.0 13.0 12.0
Следующая функция Pandas создает целевой кадр данных:
def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
newdf = df[[partCol, timeCol, lagCol]]
for x in range(numLags):
newCol = '{}_{}'.format(lagCol, x)
joindf = newdf[[partCol, timeCol, lagCol]]
joindf[timeCol] = newdf[timeCol] + x
joindf = joindf.rename(columns = {lagCol: newCol})
newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
return newdf.drop(lagCol, axis = 1)
и работает примерно через 500 мс:
>>> %timeit print('pandas result: \n{}\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop
Я также могу выполнить это в Dask, используя map_partitions()
и получить те же результаты через ~ 900 мс (предположительно хуже, чем у Pandas из-за накладных расходов при вращении потока):
>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n{}\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop
Я также могу выполнить это в Pyspark ( Примечание: для Dask и Spark у меня есть только один раздел, чтобы сделать более справедливое сравнение с Pandas ):
>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId| t|vals|
+----------+---+----+
| 0| 0| 0|
| 0| 1| 1|
| 0| 2| 2|
| 0| 3| 3|
| 0| 4| 4|
| 0| 5| 5|
| 0| 6| 6|
| 0| 7| 7|
| 0| 8| 8|
| 0| 9| 9|
| 1| 0| 10|
| 1| 1| 11|
| 1| 2| 12|
| 1| 3| 13|
| 1| 4| 14|
| 1| 5| 15|
| 1| 6| 16|
| 1| 7| 17|
| 1| 8| 18|
| 1| 9| 19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1
со следующим кодом:
def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
newdf = df.select(df[partCol], df[timeCol], df[lagCol])
for x in range(numLags):
newCol = '{}_{}'.format(lagCol, x)
joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
.drop(timeCol).withColumnRenamed('newIdx', timeCol) \
.withColumnRenamed(lagCol, newCol)
newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
newdf = newdf.drop(lagCol)
return newdf
Я получаю верные результаты (хотя и перемешанный):
+----------+---+------+------+------+------+------+------+------+------+
|customerId| t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
| 1| 3| 13| 12| 11| 10| null| null| null| null|
| 1| 0| 10| null| null| null| null| null| null| null|
| 1| 1| 11| 10| null| null| null| null| null| null|
| 0| 9| 9| 8| 7| 6| 5| 4| 3| 2|
| 0| 1| 1| 0| null| null| null| null| null| null|
| 1| 4| 14| 13| 12| 11| 10| null| null| null|
| 0| 4| 4| 3| 2| 1| 0| null| null| null|
| 0| 3| 3| 2| 1| 0| null| null| null| null|
| 0| 7| 7| 6| 5| 4| 3| 2| 1| 0|
| 1| 5| 15| 14| 13| 12| 11| 10| null| null|
| 1| 6| 16| 15| 14| 13| 12| 11| 10| null|
| 0| 6| 6| 5| 4| 3| 2| 1| 0| null|
| 1| 7| 17| 16| 15| 14| 13| 12| 11| 10|
| 0| 8| 8| 7| 6| 5| 4| 3| 2| 1|
| 0| 0| 0| null| null| null| null| null| null| null|
| 0| 2| 2| 1| 0| null| null| null| null| null|
| 1| 2| 12| 11| 10| null| null| null| null| null|
| 1| 9| 19| 18| 17| 16| 15| 14| 13| 12|
| 0| 5| 5| 4| 3| 2| 1| 0| null| null|
| 1| 8| 18| 17| 16| 15| 14| 13| 12| 11|
+----------+---+------+------+------+------+------+------+------+------+
но версия Pyspark занимает значительно дольше для запуска (34 секунды):
>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop
Я держал этот пример небольшим и простым (всего 20 строк данных, только 1 раздел для Dask и Spark), поэтому я не ожидал, что использование памяти и ЦП приведет к значительным различиям в производительности.
Мой вопрос: Есть ли способ лучше настроить Pyspark или оптимизировать выполнение Pyspark для этой конкретной задачи, чтобы приблизить Pyspark к Pandas и Dask по скорости (т. Е. 0,5-1,0 секунды)?