Мне пришлось немного обновить ваши данные, потому что ваши даты не совсем упорядочены в вашем примере.
Использование задержки с кумулятивной суммой должно помочь в вашем варианте использования,
Вот рабочая версия с PySpark:
df = spark.createDataFrame(
[
(1, "04/10/2018", 1),
(1, "27/11/2018", 2),
(1, "27/11/2018", 2),
(1, "27/11/2018", 2),
(1, "27/12/2018", 3),
(1, "27/01/2019", 4),
(1, "27/02/2019", 5),
(1, "27/03/2019", 6),
(1, "27/04/2019", 7),
(1, "27/05/2019", 8),
(2, "28/12/2018", 1),
(2, "28/12/2018", 1),
(2, "28/12/2018", 1),
(2, "09/01/2019", 2),
(2, "09/01/2019", 2),
(2, "15/02/2019", 3),
(2, "15/02/2019", 3),
(2, "15/02/2019", 3),
(2, "28/02/2019", 4),
(2, "28/02/2019", 4),
(2, "02/04/2019", 5),
(2, "08/04/2019", 6),
(2, "08/04/2019", 6),
(2, "08/04/2019", 6),
(2, "09/04/2019", 6),
(2, "10/04/2019", 6),
(2, "10/04/2019", 6),
(2, "29/04/2019", 7),
(2, "06/05/2019", 8),
(2, "06/05/2019", 8),
(2, "06/05/2019", 8),
(2, "06/05/2019", 8),
(2, "06/05/2019", 8),
(2, "20/09/2019", 9),
(2, "20/09/2019", 9),
(2, "05/10/2019", 10),
(2, "05/10/2019", 10),
(2, "22/03/2020", 11),
(2, "22/03/2020", 11),
(2, "17/05/2020", 12),
(3, "20/09/2018", 1),
(3, "20/09/2018", 1),
(3, "20/09/2018", 1),
(3, "12/10/2018", 2),
(3, "12/10/2018", 2),
(3, "09/11/2018", 3),
(3, "20/12/2018", 4),
(3, "22/03/2019", 5),
(3, "22/03/2019", 5),
(3, "09/04/2019", 6),
],
['dept', 'date', 'target']
)
from pyspark.sql.functions import col, to_timestamp, when, coalesce, lit, datediff, lag, sum
from pyspark.sql import Window
window = Window.partitionBy("dept").orderBy("date_parsed")
window_cusum = (
Window
.partitionBy('dept')
.orderBy('date_parsed')
.rangeBetween(Window.unboundedPreceding, 0)
)
final_df = (
df
.withColumn('date_parsed', to_timestamp(col('date'), 'dd/MM/yyyy'))
.withColumn('diff',
when(
datediff(col("date_parsed"), lag(col("date_parsed")).over(window)) <= 2,
True
).otherwise(False)
)
.withColumn('cusum_of_false',
sum(
when(~ col("diff"), lit(1)
).otherwise(lit(0))
).over(window_cusum)
)
.withColumn("check_working", col("target") == col("cusum_of_false"))
)
final_df.orderBy("dept", "date_parsed", "cusum_of_false").show()
row_count = final_df.count()
check_working = final_df.agg(sum(when(col("check_working"), lit(1)).otherwise(lit(0)))).collect()[0][0]
assert row_count == check_working