pyspark использует Window вместе rangeBetween с rowBetween - PullRequest
0 голосов
/ 05 марта 2019

Я хочу суммировать с окном.

данные типа:

user_id     timestamp               date        event
0040b5f0    2018-01-22 13:04:32     2018-01-22  1       
0040b5f0    2018-01-22 13:04:35     2018-01-22  0   
0040b5f0    2018-01-25 18:55:08     2018-01-25  1       
0040b5f0    2018-01-25 18:56:17     2018-01-25  1       
0040b5f0    2018-01-25 20:51:43     2018-01-25  1       
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       
0040b5f0    2018-02-05 14:03:27     2018-02-05  1  

Я хочу получить результат как:

user_id     timestamp               date        event
0040b5f0    2018-01-22 13:04:32     2018-01-22  1===================|   
0040b5f0    2018-01-22 13:04:35     2018-01-22  0----|sum latest      all events
0040b5f0    2018-01-25 18:55:08     2018-01-25  1    3 events      within 3 days 
* 0040b5f0  2018-01-25 18:56:17     2018-01-25  1----|              |
0040b5f0    2018-01-25 20:51:43     2018-01-25  1===================| not include current row      
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       
0040b5f0    2018-02-05 14:03:27     2018-02-05  1  

это окно должно удовлетворять двумусловия:

  1. диапазон между -3 дня и -1.
  2. rowBetween -3 и -1.

Я пытаюсь:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
try:
    sc = SparkContext()
except:
    pass
spark = SparkSession(sc)

data = """0040b5f0    2018-01-22 13:04:32     2018-01-22     1
0040b5f0    2018-01-22 13:04:35     2018-01-22     10
0040b5f0    2018-01-25 18:55:08     2018-01-25     11
0040b5f0    2018-01-25 18:56:17     2018-01-25     12
0040b5f0    2018-01-25 20:51:43     2018-01-25     13
0040b5f0    2018-01-31 07:48:43     2018-01-31     2
0040b5f0    2018-01-31 07:48:48     2018-01-31     3
0040b5f0    2018-02-02 09:40:58     2018-02-02     4
0040b5f0    2018-02-02 09:41:01     2018-02-02     5
0040b5f0    2018-02-05 14:03:27     2018-02-05     1"""

cols = "user_id    timestamp    date    event".split('    ')
df_tmp = pd.DataFrame([i.split('    ') for i in data.split('\n')], columns=cols)

df = spark.createDataFrame(df_tmp)
df = df.withColumn('event', F.col('event').cast('double'))

w = Window().partitionBy(F.col("user_id")).orderBy(F.col("timestamp").cast('timestamp').cast("long"))\
            .rangeBetween(int(Window.currentRow/86400)*86400-86400*3, 0)\
            .orderBy(F.col("timestamp"))\
            .rowsBetween(-3, -1)

df.select('timestamp', 'event', F.sum('event').over(w).alias('event_sum')).show()

получить это:

+-------------------+-----+---------+
|          timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32|  1.0|     null|
|2018-01-22 13:04:35| 10.0|      1.0|
|2018-01-25 18:55:08| 11.0|     11.0|
|2018-01-25 18:56:17| 12.0|     22.0|
|2018-01-25 20:51:43| 13.0|     33.0|
|2018-01-31 07:48:43|  2.0|     36.0|
|2018-01-31 07:48:48|  3.0|     27.0|
|2018-02-02 09:40:58|  4.0|     18.0|
|2018-02-02 09:41:01|  5.0|      9.0|
|2018-02-05 14:03:27|  1.0|     12.0|
+-------------------+-----+---------+

метод rangeBetween является недействительным.Что я хочу, чтобы это было так:

+-------------------+-----+---------+
|          timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32|  1.0|     null|
|2018-01-22 13:04:35| 10.0|      1.0|
|2018-01-25 18:55:08| 11.0|     11.0|
|2018-01-25 18:56:17| 12.0|     22.0|
|2018-01-25 20:51:43| 13.0|     33.0|
|2018-01-31 07:48:43|  2.0|     null|
|2018-01-31 07:48:48|  3.0|      2.0|
|2018-02-02 09:40:58|  4.0|      5.0|
|2018-02-02 09:41:01|  5.0|      9.0|
|2018-02-05 14:03:27|  1.0|      9.0|
+-------------------+-----+---------+

И я использую только rangeBetween как:

w = Window().partitionBy(F.col("user_id")).orderBy(F.col("timestamp").cast('timestamp').cast("long"))\
            .rangeBetween(int(Window.currentRow/86400)*86400-86400*3, -1)

df.select('timestamp', 'event', F.sum('event').over(w).alias('event_sum')).show()

+-------------------+-----+---------+
|          timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32|  1.0|     null|
|2018-01-22 13:04:35| 10.0|      1.0|
|2018-01-25 18:55:08| 11.0|     null|
|2018-01-25 18:56:17| 12.0|     11.0|
|2018-01-25 20:51:43| 13.0|     23.0|
|2018-01-31 07:48:43|  2.0|     null|
|2018-01-31 07:48:48|  3.0|      2.0|
|2018-02-02 09:40:58|  4.0|      5.0|
|2018-02-02 09:41:01|  5.0|      9.0|
|2018-02-05 14:03:27|  1.0|     null|
+-------------------+-----+---------+

Но который я хочу (только рассмотреть дату в 3 дня. Чч: мм: сс следуетигнорируй!) это:

+-------------------+-----+---------+
|          timestamp|event|event_sum|
+-------------------+-----+---------+
|2018-01-22 13:04:32|  1.0|     null|
|2018-01-22 13:04:35| 10.0|      1.0|
|2018-01-25 18:55:08| 11.0|     11.0|
|2018-01-25 18:56:17| 12.0|     22.0|
|2018-01-25 20:51:43| 13.0|     34.0|
|2018-01-31 07:48:43|  2.0|     null|
|2018-01-31 07:48:48|  3.0|      2.0|
|2018-02-02 09:40:58|  4.0|      5.0|
|2018-02-02 09:41:01|  5.0|      9.0|
|2018-02-05 14:03:27|  1.0|      9.0|
+-------------------+-----+---------+
...