Функция окна Pyspark с фильтром на другой столбец - PullRequest
0 голосов
/ 03 декабря 2018

У меня есть фрейм данных pyspark со следующими данными:

| y | date       | amount| id |
 ----------------------------- 
| 1 | 2017-01-01 | 10    | 1  |
| 0 | 2017-01-01 | 2     | 1  |
| 1 | 2017-01-02 | 20    | 1  |
| 0 | 2017-01-02 | 3     | 1  |
| 1 | 2017-01-03 | 2     | 1  |
| 0 | 2017-01-03 | 5     | 1  |

Я хочу применить оконную функцию, но применяю агрегатную функцию sum только к столбцам с y == 1, но все еще поддерживаюдругие столбцы.Окно, которое я бы применил:

w = Window \
        .partitionBy(df.id) \
        .orderBy(df.date.asc()) \
        .rowsBetween(Window.unboundedPreceding, -1)

И результирующий фрейм данных будет выглядеть так:

| y | date       | amount| id | sum |
 ----------------------------------- 
| 1 | 2017-01-01 | 10    | 1  | 0   |
| 0 | 2017-01-01 | 2     | 1  | 0   |
| 1 | 2017-01-02 | 20    | 1  | 10  | // =10 (considering only the row with y==1)
| 0 | 2017-01-02 | 3     | 1  | 10  | // same as above
| 1 | 2017-01-03 | 2     | 1  | 30  | // =10+20
| 0 | 2017-01-03 | 5     | 1  | 30  | // same as above

Возможно ли это в любом случае?

Я пытался использовать sum(when(df.y==1, df.amount)).over(w), но не дал правильных результатов.

1 Ответ

0 голосов
/ 03 декабря 2018

На самом деле это трудно справиться с помощью одной оконной функции.Я думаю, что вы должны сначала создать несколько фиктивных столбцов, чтобы вычислить столбец суммы.Вы можете найти мое решение ниже.

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> 
>>> df.show()
+---+----------+------+---+
|  y|      date|amount| id|
+---+----------+------+---+
|  1|2017-01-01|    10|  1|
|  0|2017-01-01|     2|  1|
|  1|2017-01-02|    20|  1|
|  0|2017-01-02|     3|  1|
|  1|2017-01-03|     2|  1|
|  0|2017-01-03|     5|  1|
+---+----------+------+---+

>>> 
>>> df = df.withColumn('c1', F.when(F.col('y')==1,F.col('amount')).otherwise(0))
>>> 
>>> window1 = Window.partitionBy(df.id).orderBy(df.date.asc()).rowsBetween(Window.unboundedPreceding, -1)
>>> df = df.withColumn('c2', F.sum(df.c1).over(window1)).fillna(0)
>>> 
>>> window2 = Window.partitionBy(df.id).orderBy(df.date.asc())
>>> df = df.withColumn('c3', F.lag(df.c2).over(window2)).fillna(0)
>>> 
>>> df = df.withColumn('sum', F.when(df.y==0,df.c3).otherwise(df.c2))
>>> 
>>> df = df.select('y','date','amount','id','sum')
>>> 
>>> df.show()
+---+----------+------+---+---+                                                 
|  y|      date|amount| id|sum|
+---+----------+------+---+---+
|  1|2017-01-01|    10|  1|  0|
|  0|2017-01-01|     2|  1|  0|
|  1|2017-01-02|    20|  1| 10|
|  0|2017-01-02|     3|  1| 10|
|  1|2017-01-03|     2|  1| 30|
|  0|2017-01-03|     5|  1| 30|
+---+----------+------+---+---+

Это решение может не сработать, если там есть несколько строк y = 1 или y = 0 в день, пожалуйста, рассмотрите его

...