Я предполагаю, что name
- это отдельное значение для каждого advertiser_id
, и поэтому ваш набор данных сортируется по name
.Я также предполагаю, что max_total_advertiser
содержит одинаковое значение для каждого advertiser_id
.Если это не так, добавьте комментарий.
Вам нужно окно rangeBetween , в котором отображаются все предшествующие и последующие строки в указанном диапазоне.Мы будем использовать Window.unboundedPreceding
, так как мы хотим суммировать все предыдущие значения.
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [
(4061, 'source1',-434.955284,-354882.75336200005, -355938.53950700007),
(4061, 'source2',-594.012216,-355476.76557800005, -345938.53950700007),
(4062, 'source1',-594.012216,-355476.76557800005, -5938.53950700007),
(4062, 'source2',-594.012216,-355476.76557800005, -5938.53950700007),
(4061, 'source3',-461.773929,-355938.53950700007, -355938.53950700007)
]
columns = ['advertiser_id','name' ,'amount', 'total', 'max_total_advertiser']
df=spark.createDataFrame(l, columns)
w = Window.partitionBy('advertiser_id').orderBy('name').rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('total', F.sum('amount').over(w) + df.max_total_advertiser)
df.show()
Вывод:
+-------------+-------+-----------+-------------------+--------------------+
|advertiser_id| name| amount| total|max_total_advertiser|
+-------------+-------+-----------+-------------------+--------------------+
| 4062|source1|-594.012216|-6532.5517230000705| -5938.53950700007|
| 4062|source2|-594.012216| -7126.563939000071| -5938.53950700007|
| 4061|source1|-434.955284| -356373.4947910001| -355938.53950700007|
| 4061|source2|-594.012216| -346967.5070070001| -345938.53950700007|
| 4061|source3|-461.773929|-357429.28093600005| -355938.53950700007|
+-------------+-------+-----------+-------------------+--------------------+