Вам нужно использовать несколько оконных функций для этого случая.Вы можете найти мое решение ниже
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>>
>>> df = sc.parallelize([
... ('M1','DAY1',1),
... ('M1','DAY2',0),
... ('M1','DAY3',0),
... ('M1','DAY4',1),
... ('M1','DAY5',0)
... ]).toDF(['machine','date','event'])
>>>
>>> df.show()
+-------+----+-----+
|machine|date|event|
+-------+----+-----+
| M1|DAY1| 1|
| M1|DAY2| 0|
| M1|DAY3| 0|
| M1|DAY4| 1|
| M1|DAY5| 0|
+-------+----+-----+
>>> window1 = Window.partitionBy('machine').orderBy('date')
>>> window2 = Window.partitionBy('machine','new_col').orderBy('date')
>>>
>>> df = df.withColumn('new_col', F.sum(F.lag('event').over(window1)).over(window1))
>>> df = df.withColumn('days_since_last_event', F.when(F.isnull('new_col')==True,0).otherwise(F.rank().over(window2)))
>>>
>>> df = df.drop('new_col')
>>>
>>> df.show()
+-------+----+-----+---------------------+
|machine|date|event|days_since_last_event|
+-------+----+-----+---------------------+
| M1|DAY1| 1| 0|
| M1|DAY2| 0| 1|
| M1|DAY3| 0| 2|
| M1|DAY4| 1| 3|
| M1|DAY5| 0| 1|
+-------+----+-----+---------------------+