Вы можете использовать оконные функции. Но это не так просто. Вот пошаговое объяснение кода и логики c.
(в приведенных ниже кодах и объяснении значения uv и updatedValue не совпадают)
1.Читать df
df=spark.read.csv(path, header=True, inferSchema=True)
2. Укажите окно
w=Window.partitionBy("user_id").orderBy("row")
3.Создать столбец, который сравнивает текущее значение только с UB и LB предыдущей строки, если он находится в диапазоне, затем возвращает предыдущую строку currentValue, иначе возвращает ту же строку currentValue, давайте назовем этот столбец "uv"
df2=df.withColumn("uv",when(col("row")==1,col("currentValue"))
.when(col("currentValue").between(lag("UB",1).over(w),
lag("LB",1).over(w)),lag("currentValue",1).over(w))
.otherwise(col("currentValue"))).orderBy("user_id")
df2:
+-------+---+------------+----+----+---+
|user_id|row|currentValue| UB| LB| uv|
+-------+---+------------+----+----+---+
| usr001| 1| 12| 7.2|16.8| 12|
| usr001| 2| 20|12.0|28.0| 20|
| usr001| 3| 17|10.2|23.8| 20|
| usr001| 4| 21|12.6|29.4| 17|
| usr001| 5| 9| 5.4|12.6| 9|
| usr001| 6| 23|13.8|32.2| 23|
| usr002| 1| 11| 6.6|15.4| 11|
| usr002| 2| 10| 6.0|14.0| 11|
| usr002| 3| 15| 9.0|21.0| 15|
| usr002| 4| 3| 1.8| 4.2| 3|
| usr002| 5| 4| 2.4| 5.6| 3|
+-------+---+------------+----+----+---+
4. Это основные логи c, согласно вашим логи c для строки 5 (usr001). Сначала мы должны проверить, заполнена ли строка 4 updatedValue row4 currentValue, если он заполнен, тогда сравните значение строки 5 с границами строки 4, иначе нам нужно перейти к строке, из которой строка 4 updatedValue заполнена, и сравнить с этими границами, чтобы реализовать это на предыдущем шаге, отметив все значения, где currentValue == uv .
df3=df2.withColumn("comp_row", when(col("currentValue")==col("uv"), col("row")))
df3:
+-------+---+------------+----+----+---+--------+
|user_id|row|currentValue| UB| LB| uv|comp_row|
+-------+---+------------+----+----+---+--------+
| usr001| 1| 12| 7.2|16.8| 12| 1|
| usr001| 2| 20|12.0|28.0| 20| 2|
| usr001| 3| 17|10.2|23.8| 20| null|
| usr001| 4| 21|12.6|29.4| 17| null|
| usr001| 5| 9| 5.4|12.6| 9| 5|
| usr001| 6| 23|13.8|32.2| 23| 6|
| usr002| 1| 11| 6.6|15.4| 11| 1|
| usr002| 2| 10| 6.0|14.0| 11| null|
| usr002| 3| 15| 9.0|21.0| 15| 3|
| usr002| 4| 3| 1.8| 4.2| 3| 4|
| usr002| 5| 4| 2.4| 5.6| 3| null|
+-------+---+------------+----+----+---+--------+
5. Теперь, если мы заполним нулевые значения каждой строки, мы получим номер строки, с которой должна сравниваться каждая строка.
df4 = df3.withColumn("comp_row",last("comp_row",True).over(w))
df4:
+-------+---+------------+----+----+---+--------+
|user_id|row|currentValue| UB| LB| uv|comp_row|
+-------+---+------------+----+----+---+--------+
| usr001| 1| 12| 7.2|16.8| 12| 1|
| usr001| 2| 20|12.0|28.0| 20| 2|
| usr001| 3| 17|10.2|23.8| 20| 2|
| usr001| 4| 21|12.6|29.4| 17| 2|
| usr001| 5| 9| 5.4|12.6| 9| 5|
| usr001| 6| 23|13.8|32.2| 23| 6|
| usr002| 1| 11| 6.6|15.4| 11| 1|
| usr002| 2| 10| 6.0|14.0| 11| 1|
| usr002| 3| 15| 9.0|21.0| 15| 3|
| usr002| 4| 3| 1.8| 4.2| 3| 4|
| usr002| 5| 4| 2.4| 5.6| 3| 4|
+-------+---+------------+----+----+---+--------+
Примечание: значения o f comp_row указывает, с какой строкой должна сравниваться следующая строка, например: строка 4 (usr001) comp_row содержит 2, это означает, что строка 5 сравнивается со строкой 2.
6. Теперь мы знаем, какая строка сравнивается с какой строкой, все нам нужно просто получить границы этих строк. Для этого нам нужно объединить строку с comp_row, чтобы мы могли получить границы строки 2 в строке 4.
df5 = df4.select("user_id",col("row").alias("comp_row"),
col("UB").alias("new_UB"),col("LB").alias("new_LB")
,col("currentValue").alias("new_currentValue"))
# Note: Here row is selected as comp_row.
df6=df5.join(df4,["user_id","comp_row"],"inner").orderBy("user_id","row")
df6.select("user_id",
"UB","LB"
,"new_UB","new_LB"
,"currentValue","new_currentValue"
,"row","comp_row").show()
+-------+----+----+------+------+------------+----------------+---+--------+
|user_id| UB| LB|new_UB|new_LB|currentValue|new_currentValue|row|comp_row|
+-------+----+----+------+------+------------+----------------+---+--------+
| usr001| 7.2|16.8| 7.2| 16.8| 12| 12| 1| 1|
| usr001|12.0|28.0| 12.0| 28.0| 20| 20| 2| 2|
| usr001|10.2|23.8| 12.0| 28.0| 17| 20| 3| 2|
| usr001|12.6|29.4| 12.0| 28.0| 21| 20| 4| 2|
| usr001| 5.4|12.6| 5.4| 12.6| 9| 9| 5| 5|
| usr001|13.8|32.2| 13.8| 32.2| 23| 23| 6| 6|
| usr002| 6.6|15.4| 6.6| 15.4| 11| 11| 1| 1|
| usr002| 6.0|14.0| 6.6| 15.4| 10| 11| 2| 1|
| usr002| 9.0|21.0| 9.0| 21.0| 15| 15| 3| 3|
| usr002| 1.8| 4.2| 1.8| 4.2| 3| 3| 4| 4|
| usr002| 2.4| 5.6| 1.8| 4.2| 4| 3| 5| 4|
+-------+----+----+------+------+------------+----------------+---+--------+
7. Последний шаг и Boom !!, сравнить значения тока с новыми границами в предыдущая строка, если она находится в границах, тогда updatedValue = new_currentValue предыдущей строки, в противном случае updatedValue = currentValue той же строки.
df7=df6.withColumn("updatedValue",when(col("row")==1,col("currentValue"))\
.when(col("currentValue").between(lag("new_UB",1).over(w),
lag("new_LB",1).over(w)),lag("new_currentValue",1).over(w))
.otherwise(col("currentValue"))).orderBy("user_id")\
.select("user_id","currentValue","UB","LB","updatedValue")
df7:
+-------+------------+----+----+------------+
|user_id|currentValue| UB| LB|updatedValue|
+-------+------------+----+----+------------+
| usr001| 12| 7.2|16.8| 12|
| usr001| 20|12.0|28.0| 20|
| usr001| 17|10.2|23.8| 20|
| usr001| 21|12.6|29.4| 20|
| usr001| 9| 5.4|12.6| 9|
| usr001| 23|13.8|32.2| 23|
| usr002| 11| 6.6|15.4| 11|
| usr002| 10| 6.0|14.0| 11|
| usr002| 15| 9.0|21.0| 11|
| usr002| 3| 1.8| 4.2| 3|
| usr002| 4| 2.4| 5.6| 3|
+-------+------------+----+----+------------+