Добавление значения в столбец, если выполняется определенное условие для каждой группы в искре - PullRequest
1 голос
/ 10 июля 2020

У меня есть кое-что довольно простое. Для каждой группы он начинается с 1, если условие выполнено, следующие строки являются значением предыдущей строки +1. Это идет все дальше и дальше в группе, каждый раз, когда условие выполняется, добавляйте 1.

Таблица ниже может показать это более четко. (Я пытаюсь создать столбец what_i_want)

group   to_add_number   what_i_want
aaaaaa  0                 1
aaaaaa  0                 1
aaaaaa  1                 2
aaaaaa  0                 2
aaaaaa  0                 2
aaaaaa  1                 3
aaaaaa  0                 3
aaaaaa  0                 3
bbbbbb  0                 1
bbbbbb  1                 2
bbbbbb  1                 3
bbbbbb  0                 3
cccccc  0                 1
cccccc  0                 1
cccccc  0                 1
cccccc  1                 2

Я думаю, что оконная функция (задержка) могла бы это сделать, но я не могу этого добиться.

Что я пробовал это:

from pyspark.sql.functions import lit,when,lag,row_number
from pyspark.sql.window import Window

windowSpec=Window.partitionBy('group')
df=df.withColumn('tmp_rnk',lit(1))
df=df.withColumn('what_i_want',when(col('to_add_number')==0,lag('tmp_rnk').over(windowSpec)).otherwise(col('what_i_want')+1)

or

df=df.withColumn('tmp_rnk',lit(1))
df=df.withColumn('row_number_rank',row_number().over(windowSpec))
df=df.withColumn('what_i_want',when((col('to_add_number')==0)&(col('row_number_rank')==1)
,lit(1)
.when(col('to_add_number')==0)&(col('row_number_rank')>1),lag('what_i_want').over(windowSpec).otherwise(col('what_i_want')+1)

Я пробовал несколько вариантов, искал в stackoverflow с точки зрения 'условных оконных функций', 'отставание, опережение ....), но ничего не сработало, или я не нашел повторяющегося вопроса .

1 Ответ

3 голосов
/ 10 июля 2020

Чтобы получить столбец what_i_want, вы можете запустить incremental sum на to_add_number со столбцом orderby (order_id ).

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df.withColumn("order_id", F.monotonically_increasing_id())\
  .withColumn("what_i_want", F.sum("to_add_number").over(Window().partitionBy("group").orderBy("order_id"))+1)\
  .orderBy("order_id").drop("order_id").show()


#+------+-------------+-----------+
#| group|to_add_number|what_i_want|
#+------+-------------+-----------+
#|aaaaaa|            0|          1|
#|aaaaaa|            0|          1|
#|aaaaaa|            1|          2|
#|aaaaaa|            0|          2|
#|aaaaaa|            0|          2|
#|aaaaaa|            1|          3|
#|aaaaaa|            0|          3|
#|aaaaaa|            0|          3|
#|bbbbbb|            0|          1|
#|bbbbbb|            1|          2|
#|bbbbbb|            1|          3|
#|bbbbbb|            0|          3|
#|cccccc|            0|          1|
#|cccccc|            0|          1|
#|cccccc|            0|          1|
#|cccccc|            1|          2|
#+------+-------------+-----------+
...