Фрейм данных Pyspark, итерация между флагами на основе группы - PullRequest
0 голосов
/ 07 октября 2018

мне нужно создать счетчик между событиями в фрейме данных pyspark, например:

input:

+-------+----+------+  
|machine|date|event |
+-------+----+------+  
| M1    |DAY1|     1|
| M1    |DAY2|     0|
| M1    |DAY3|     0|
| M1    |DAY4|     1|
| M1    |DAY5|     0|
+-------+----+------+ 

Ожидаемый результат:

+-------+----+------+----------------------+  
|machine|date|event |days since last event |
+-------+----+------+----------------------+  
| M1    |DAY1|     1|                     0|
| M1    |DAY2|     0|                     1|
| M1    |DAY3|     0|                     2|
| M1    |DAY4|     1|                     3|
| M1    |DAY5|     0|                     1|
+-------+----+------+----------------------+ 

Я видело Window функций, но я не могу понять, как сделать оператор if, который перезапускает счетчик после того, как он наткнулся на другое событие флага.

Любые идеи о том, как я могу это сделать

1 Ответ

0 голосов
/ 08 октября 2018

Вам нужно использовать несколько оконных функций для этого случая.Вы можете найти мое решение ниже

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> 
>>> df = sc.parallelize([
...     ('M1','DAY1',1),
...     ('M1','DAY2',0),
...     ('M1','DAY3',0),
...     ('M1','DAY4',1),
...     ('M1','DAY5',0)
...     ]).toDF(['machine','date','event'])
>>> 
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
|     M1|DAY1|    1|
|     M1|DAY2|    0|
|     M1|DAY3|    0|
|     M1|DAY4|    1|
|     M1|DAY5|    0|
+-------+----+-----+

>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>> 
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>> 
>>> df = df.drop('new_col')
>>> 
>>> df.show()
+-------+----+-----+---------------------+                                      
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
|     M1|DAY1|    1|                    0|
|     M1|DAY2|    0|                    1|
|     M1|DAY3|    0|                    2|
|     M1|DAY4|    1|                    3|
|     M1|DAY5|    0|                    1|
+-------+----+-----+---------------------+
...