PySpark Cum Сумма двух значений - PullRequest
1 голос
/ 06 июля 2019

Учитывая следующий пример кадра данных:

advertiser_id| name | amount    | total             |max_total_advertiser|
4061         |source1|-434.955284|-354882.75336200005| -355938.53950700007
4061         |source2|-594.012216|-355476.76557800005| -355938.53950700007
4061         |source3|-461.773929|-355938.53950700007| -355938.53950700007

Мне нужно сложить сумму и поле max_total_advertiser , чтобы получить правильное общее значение в каждой строке. Принимая во внимание, что мне нужно это значение total для каждой группы, разбитой на advertiser_id. (Общий столбец в исходном кадре данных неверен, поэтому я хочу правильно рассчитать)

Нечто подобное должно быть:

w = Window.partitionBy("advertiser_id").orderBy("advertiser_id")

df.withColumn("total_aux", when( lag("advertiser_id").over(w) == col("advertiser_id"), lag("total_aux").over(w) + col("amount") ).otherwise( col("max_total_advertiser") + col("amount") ))

Это lag("total_aux") не работает, потому что столбец еще не сгенерирован, это то, чего я хочу добиться, если это первая строка в группе, суммируйте столбцы в той же строке, если не суммируйте предыдущее полученное значение с текущее количество поле. Пример вывода:

advertiser_id| name | amount    | total_aux             |
4061         |source1|-434.955284|-356373.494791    |
4061         |source2|-594.012216|-356967.507007    | 
4061         |source3|-461.773929|-357429.280936    |

Спасибо.

Ответы [ 2 ]

1 голос
/ 06 июля 2019

Я предполагаю, что name - это отдельное значение для каждого advertiser_id, и поэтому ваш набор данных сортируется по name.Я также предполагаю, что max_total_advertiser содержит одинаковое значение для каждого advertiser_id.Если это не так, добавьте комментарий.

Вам нужно окно rangeBetween , в котором отображаются все предшествующие и последующие строки в указанном диапазоне.Мы будем использовать Window.unboundedPreceding, так как мы хотим суммировать все предыдущие значения.

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [
(4061, 'source1',-434.955284,-354882.75336200005, -355938.53950700007),
(4061, 'source2',-594.012216,-355476.76557800005, -345938.53950700007),
(4062, 'source1',-594.012216,-355476.76557800005, -5938.53950700007),
(4062, 'source2',-594.012216,-355476.76557800005, -5938.53950700007),
(4061, 'source3',-461.773929,-355938.53950700007, -355938.53950700007)
]

columns = ['advertiser_id','name' ,'amount', 'total', 'max_total_advertiser']

df=spark.createDataFrame(l, columns)

w = Window.partitionBy('advertiser_id').orderBy('name').rangeBetween(Window.unboundedPreceding, 0)

df = df.withColumn('total', F.sum('amount').over(w) + df.max_total_advertiser)
df.show()

Вывод:

+-------------+-------+-----------+-------------------+--------------------+ 
|advertiser_id|   name|     amount|              total|max_total_advertiser| 
+-------------+-------+-----------+-------------------+--------------------+ 
|         4062|source1|-594.012216|-6532.5517230000705|   -5938.53950700007| 
|         4062|source2|-594.012216| -7126.563939000071|   -5938.53950700007| 
|         4061|source1|-434.955284| -356373.4947910001| -355938.53950700007| 
|         4061|source2|-594.012216| -346967.5070070001| -345938.53950700007| 
|         4061|source3|-461.773929|-357429.28093600005| -355938.53950700007| 
+-------------+-------+-----------+-------------------+--------------------+
0 голосов
/ 06 июля 2019

Возможно, вы ищете функцию orderBy(). Это работает?

from pyspark.sql.window import *

df.withColumn("cumulativeSum", sum(df("amount"))
             .over( Window.partitionBy("advertiser_id").orderBy("amount")))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...