Я использую .rangeBetween (Window.unboundedPreceding, 0).
Эта функция ищет в текущем значении добавленное значение для обратного
import pyspark
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
error = 'This is error'
l = [('1/1/2019' , 1 , 10),
('2/1/2019', 2 , 20 ),
('3/1/2019', 1 , 30 ),
('4/1/2019', 1 , 10 ),
('5/1/2019', 1 , 22 ),
('7/1/2019', 1 , 23 ),
('22/1/2019', 2 , 5),
('11/1/2019', 2 , 24),
('13/2/2019', 1 , 16),
('14/2/2019', 2 , 18),
('5/2/2019', 1 , 19),
('6/2/2019', 2 , 23),
('7/2/2019', 1 , 14),
('8/3/2019', 1 , 50),
('8/3/2019', 2 , 50)]
columns = ['date', 'vin', 'mileage']
df=spark.createDataFrame(l, columns)
df = df.withColumn('date',F.to_date(df.date, 'dd/MM/yyyy'))
df = df.withColumn("max", lit(0))
df = df.withColumn("error_code", lit(''))
w = Window.partitionBy('vin').orderBy('date').rangeBetween(Window.unboundedPreceding,0)
df = df.withColumn('max',F.max('mileage').over(w))
df = df.withColumn('error_code', F.when(F.col('mileage') < F.col('max'), F.lit('ERROR')).otherwise(F.lit('')))
df.show()
![enter image description here](https://i.stack.imgur.com/LYmpq.png)
Наконец, осталось только удалить столбец с максимальным значением
df = df.drop('max')
df.show()
![enter image description here](https://i.stack.imgur.com/RhbiP.png)