Основная идея заключается в использовании collect_list()
в порядке убывания на Name
и yr
кумулятивно. collect_list()
даст вам массив значений в столбце.
# Creating the DataFrame
df = sc.parallelize([('a',1,100),('a',2,200),('a',3,300),('a',4,400),('a',5,500),('a',6,600),('b',1,23),('b',2,32),('b',3,34),('b',4,55),('b',5,43)]).toDF(['Name','yr','cash'])
df.show()
+----+---+----+
|Name| yr|cash|
+----+---+----+
| a| 1| 100|
| a| 2| 200|
| a| 3| 300|
| a| 4| 400|
| a| 5| 500|
| a| 6| 600|
| b| 1| 23|
| b| 2| 32|
| b| 3| 34|
| b| 4| 55|
| b| 5| 43|
+----+---+----+
После создания df
вы можете использовать функцию Window
для совокупного объединения списка.
# Loading the requisite packages
from pyspark.sql import Window
from pyspark.sql.functions import col, collect_list
w = (Window.partitionBy('Name').orderBy(col('yr').desc()).rangeBetween(Window.unboundedPreceding, 0))
df = df.withColumn('cash_list', collect_list('cash').over(w))
df.show(truncate=False)
+----+---+----+------------------------------+
|Name|yr |cash|cash_list |
+----+---+----+------------------------------+
|b |5 |43 |[43] |
|b |4 |55 |[43, 55] |
|b |3 |34 |[43, 55, 34] |
|b |2 |32 |[43, 55, 34, 32] |
|b |1 |23 |[43, 55, 34, 32, 23] |
|a |6 |600 |[600] |
|a |5 |500 |[600, 500] |
|a |4 |400 |[600, 500, 400] |
|a |3 |300 |[600, 500, 400, 300] |
|a |2 |200 |[600, 500, 400, 300, 200] |
|a |1 |100 |[600, 500, 400, 300, 200, 100]|
+----+---+----+------------------------------+