Я хочу суммировать с окном.
данные типа:
user_id timestamp date event
0040b5f0 2018-01-22 13:04:32 2018-01-22 1
0040b5f0 2018-01-22 13:04:35 2018-01-22 0
0040b5f0 2018-01-25 18:55:08 2018-01-25 1
0040b5f0 2018-01-25 18:56:17 2018-01-25 1
0040b5f0 2018-01-25 20:51:43 2018-01-25 1
0040b5f0 2018-01-31 07:48:43 2018-01-31 1
0040b5f0 2018-01-31 07:48:48 2018-01-31 0
0040b5f0 2018-02-02 09:40:58 2018-02-02 1
0040b5f0 2018-02-02 09:41:01 2018-02-02 0
0040b5f0 2018-02-05 14:03:27 2018-02-05 1
Я хочу получить результат как:
user_id timestamp date event
0040b5f0 2018-01-22 13:04:32 2018-01-22 1===================|
0040b5f0 2018-01-22 13:04:35 2018-01-22 0----|sum latest all events
0040b5f0 2018-01-25 18:55:08 2018-01-25 1 3 events within 3 days
* 0040b5f0 2018-01-25 18:56:17 2018-01-25 1----| |
0040b5f0 2018-01-25 20:51:43 2018-01-25 1===================| not include current row
0040b5f0 2018-01-31 07:48:43 2018-01-31 1
0040b5f0 2018-01-31 07:48:48 2018-01-31 0
0040b5f0 2018-02-02 09:40:58 2018-02-02 1
0040b5f0 2018-02-02 09:41:01 2018-02-02 0
0040b5f0 2018-02-05 14:03:27 2018-02-05 1
это окно должно удовлетворять двумусловия:
- диапазон между -3 дня и -1.
- rowBetween -3 и -1.
Я пытаюсь:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
try:
sc = SparkContext()
except:
pass
spark = SparkSession(sc)
data = """0040b5f0 2018-01-22 13:04:32 2018-01-22 1
0040b5f0 2018-01-22 13:04:35 2018-01-22 10
0040b5f0 2018-01-25 18:55:08 2018-01-25 11
0040b5f0 2018-01-25 18:56:17 2018-01-25 12
0040b5f0 2018-01-25 20:51:43 2018-01-25 13
0040b5f0 2018-01-31 07:48:43 2018-01-31 2
0040b5f0 2018-01-31 07:48:48 2018-01-31 3
0040b5f0 2018-02-02 09:40:58 2018-02-02 4
0040b5f0 2018-02-02 09:41:01 2018-02-02 5
0040b5f0 2018-02-05 14:03:27 2018-02-05 1"""
cols = "user_id timestamp date event".split(' ')
df_tmp = pd.DataFrame([i.split(' ') for i in data.split('\n')], columns=cols)
df = spark.createDataFrame(df_tmp)
df = df.withColumn('event', F.col('event').cast('double'))
w = Window().partitionBy(F.col("user_id")).orderBy(F.col("timestamp").cast('timestamp').cast("long"))\
.rangeBetween(int(Window.currentRow/86400)*86400-86400*3, 0)\
.orderBy(F.col("timestamp"))\
.rowsBetween(-3, -1)
df.select('timestamp', 'event', F.sum('event').over(w).alias('event_sum')).show()
получить это:
+-------------------+-----+---------+
| timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32| 1.0| null|
|2018-01-22 13:04:35| 10.0| 1.0|
|2018-01-25 18:55:08| 11.0| 11.0|
|2018-01-25 18:56:17| 12.0| 22.0|
|2018-01-25 20:51:43| 13.0| 33.0|
|2018-01-31 07:48:43| 2.0| 36.0|
|2018-01-31 07:48:48| 3.0| 27.0|
|2018-02-02 09:40:58| 4.0| 18.0|
|2018-02-02 09:41:01| 5.0| 9.0|
|2018-02-05 14:03:27| 1.0| 12.0|
+-------------------+-----+---------+
метод rangeBetween является недействительным.Что я хочу, чтобы это было так:
+-------------------+-----+---------+
| timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32| 1.0| null|
|2018-01-22 13:04:35| 10.0| 1.0|
|2018-01-25 18:55:08| 11.0| 11.0|
|2018-01-25 18:56:17| 12.0| 22.0|
|2018-01-25 20:51:43| 13.0| 33.0|
|2018-01-31 07:48:43| 2.0| null|
|2018-01-31 07:48:48| 3.0| 2.0|
|2018-02-02 09:40:58| 4.0| 5.0|
|2018-02-02 09:41:01| 5.0| 9.0|
|2018-02-05 14:03:27| 1.0| 9.0|
+-------------------+-----+---------+
И я использую только rangeBetween как:
w = Window().partitionBy(F.col("user_id")).orderBy(F.col("timestamp").cast('timestamp').cast("long"))\
.rangeBetween(int(Window.currentRow/86400)*86400-86400*3, -1)
df.select('timestamp', 'event', F.sum('event').over(w).alias('event_sum')).show()
+-------------------+-----+---------+
| timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32| 1.0| null|
|2018-01-22 13:04:35| 10.0| 1.0|
|2018-01-25 18:55:08| 11.0| null|
|2018-01-25 18:56:17| 12.0| 11.0|
|2018-01-25 20:51:43| 13.0| 23.0|
|2018-01-31 07:48:43| 2.0| null|
|2018-01-31 07:48:48| 3.0| 2.0|
|2018-02-02 09:40:58| 4.0| 5.0|
|2018-02-02 09:41:01| 5.0| 9.0|
|2018-02-05 14:03:27| 1.0| null|
+-------------------+-----+---------+
Но который я хочу (только рассмотреть дату в 3 дня. Чч: мм: сс следуетигнорируй!) это:
+-------------------+-----+---------+
| timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32| 1.0| null|
|2018-01-22 13:04:35| 10.0| 1.0|
|2018-01-25 18:55:08| 11.0| 11.0|
|2018-01-25 18:56:17| 12.0| 22.0|
|2018-01-25 20:51:43| 13.0| 34.0|
|2018-01-31 07:48:43| 2.0| null|
|2018-01-31 07:48:48| 3.0| 2.0|
|2018-02-02 09:40:58| 4.0| 5.0|
|2018-02-02 09:41:01| 5.0| 9.0|
|2018-02-05 14:03:27| 1.0| 9.0|
+-------------------+-----+---------+