Оконная функция с динамической задержкой - PullRequest
0 голосов
/ 27 сентября 2018

Я смотрю на функцию слайда окна для Spark DataFrame в Spark SQL.

У меня есть кадр данных со столбцами id, month и volume.

id       month   volume new_col
1        201601  100     0
1        201602  120   100
1        201603  450   220
1        201604  200   670
1        201605  121   870

Теперь я хочу добавить новый столбец с именем new_col, значение new_col является суммой volume и new_col перед текущей строкой, как показано выше.Значение первой строки new_col будет равно нулю.

Я опробовал ниже вариант использования оконной функции lag с помощью PySpark.Но я обнаружил, что столбец new_col будет использоваться рекурсивно.Способ только с помощью функции lag не может сделать это:

window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.lag(col('volume'), 1).over(window) + F.lag(col('new_col'), 1).over(window))

Есть ли способ динамически отстать от new_col с помощью оконных функций?Или есть другие хорошие решения?

Ответы [ 2 ]

0 голосов
/ 27 сентября 2018

Вы можете использовать вложенные оконные функции

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> 
>>> data = sc.parallelize([
...     (1,'201601',100),
...     (1,'201602',120),
...     (1,'201603',450),
...     (1,'201604',200),
...     (1,'201605',121)])
>>> col = ['id','month', 'volume']
>>> 
>>> df = spark.createDataFrame(data, col)
>>> df.show()
+---+------+------+
| id| month|volume|
+---+------+------+
|  1|201601|   100|
|  1|201602|   120|
|  1|201603|   450|
|  1|201604|   200|
|  1|201605|   121|
+---+------+------+

>>> window1 = Window.partitionBy('id').orderBy('month')
>>> window2 = Window.partitionBy('id').orderBy('month').rangeBetween(Window.unboundedPreceding, 0)
>>> df = df.withColumn('new_col', F.sum(F.lag('volume').over(window1)).over(window2)).na.fill({'new_col': 0})
>>> df.show()
+---+------+------+-------+                                                     
| id| month|volume|new_col|
+---+------+------+-------+
|  1|201601|   100|      0|
|  1|201602|   120|    100|
|  1|201603|   450|    220|
|  1|201604|   200|    670|
|  1|201605|   121|    870|
+---+------+------+-------+
0 голосов
/ 27 сентября 2018

Вы можете использовать lag и sum над окном, чтобы достичь этого.sum будет автоматически вычислять cumsum , если используется над окном.Приведенный ниже код будет сначала отставать от столбца volume, а затем принимать его значение, но выполнение операций в обратном порядке также возможно.

window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.sum(F.lag(col('volume'), 1, 0).over(window)).over(window))
...