Вы можете использовать неограниченный Window
с только partitionBy
предложением, как показано ниже, или вы можете указать rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
. Я проверил на ваших данных, и он отлично работает. Дайте мне знать, если это работает.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("year")
w2=Window().partitionBy("year","week")
w3=Window().partitionBy("year","week","day")
df.withColumn("total_Sales_year", F.sum("qty").over(w1))\
.withColumn("total_sales_by_day", F.sum("qty").over(w3))\
.withColumn("total_sales_week", F.sum("qty").over(w2)).show()
+---+---+----+---+----+----------------+------------------+----------------+
| ID|qty|year|day|week|total_Sales_year|total_sales_by_day|total_sales_week|
+---+---+----+---+----+----------------+------------------+----------------+
| 1| 2|2016| 5| 9| 78| 12| 14|
| 2| 4|2016| 5| 9| 78| 12| 14|
| 3| 0|2016| 5| 9| 78| 12| 14|
| 4| 4|2016| 5| 9| 78| 12| 14|
| 5| 0|2016| 5| 9| 78| 12| 14|
| 6| 2|2016| 5| 9| 78| 12| 14|
| 1| 0|2016| 6| 9| 78| 2| 14|
| 2| 2|2016| 6| 9| 78| 2| 14|
| 3| 0|2016| 6| 9| 78| 2| 14|
| 4| 0|2016| 6| 9| 78| 2| 14|
| 5| 0|2016| 6| 9| 78| 2| 14|
| 6| 0|2016| 6| 9| 78| 2| 14|
| 1| 0|2016| 0| 10| 78| 12| 64|
| 2| 2|2016| 0| 10| 78| 12| 64|
| 3| 2|2016| 0| 10| 78| 12| 64|
| 4| 2|2016| 0| 10| 78| 12| 64|
| 5| 6|2016| 0| 10| 78| 12| 64|
| 6| 0|2016| 0| 10| 78| 12| 64|
| 1| 0|2016| 1| 10| 78| 2| 64|
| 2| 0|2016| 1| 10| 78| 2| 64|
+---+---+----+---+----+----------------+------------------+----------------+
only showing top 20 rows
Дополнительно , если вы хотите столбец total_sales_year_to_day
, вы можете использовать это:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("year")
w2=Window().partitionBy("year","week")
w3=Window().partitionBy("year","week","day")
w4=Window().partitionBy("year","week","day").orderBy("day")
w5=Window().partitionBy("year").orderBy("week","day")
df.withColumn("total_Sales_year", F.sum("qty").over(w1))\
.withColumn("total_sales_by_day", F.sum("qty").over(w3))\
.withColumn("total_sales_week", F.sum("qty").over(w2))\
.withColumn("rownum", F.row_number().over(w4))\
.withColumn("newday", F.when(F.col("rownum")!=1, F.lit(0)).otherwise(F.col("total_sales_by_day")))\
.withColumn("total_sales_year_to_day", F.sum("newday").over(w5)).drop("rownum","newday").show()
+---+---+----+---+----+----------------+------------------+----------------+-----------------------+
| ID|qty|year|day|week|total_Sales_year|total_sales_by_day|total_sales_week|total_sales_year_to_day|
+---+---+----+---+----+----------------+------------------+----------------+-----------------------+
| 1| 2|2016| 5| 9| 78| 12| 14| 12|
| 2| 4|2016| 5| 9| 78| 12| 14| 12|
| 3| 0|2016| 5| 9| 78| 12| 14| 12|
| 4| 4|2016| 5| 9| 78| 12| 14| 12|
| 5| 0|2016| 5| 9| 78| 12| 14| 12|
| 6| 2|2016| 5| 9| 78| 12| 14| 12|
| 1| 0|2016| 6| 9| 78| 2| 14| 14|
| 2| 2|2016| 6| 9| 78| 2| 14| 14|
| 3| 0|2016| 6| 9| 78| 2| 14| 14|
| 4| 0|2016| 6| 9| 78| 2| 14| 14|
| 5| 0|2016| 6| 9| 78| 2| 14| 14|
| 6| 0|2016| 6| 9| 78| 2| 14| 14|
| 1| 0|2016| 0| 10| 78| 12| 64| 26|
| 2| 2|2016| 0| 10| 78| 12| 64| 26|
| 3| 2|2016| 0| 10| 78| 12| 64| 26|
| 4| 2|2016| 0| 10| 78| 12| 64| 26|
| 5| 6|2016| 0| 10| 78| 12| 64| 26|
| 6| 0|2016| 0| 10| 78| 12| 64| 26|
| 1| 0|2016| 1| 10| 78| 2| 64| 28|
| 2| 0|2016| 1| 10| 78| 2| 64| 28|
+---+---+----+---+----+----------------+------------------+----------------+-----------------------+
only showing top 20 rows