Да, это можно сделать с помощью .lead()
import org.apache.spark.sql.expressions._
//define window specification
val windowSpec = Window.partitionBy($"type",$"f1").orderBy($"type")
val inputDF = sc.parallelize(List((1,"a","xy",11),(2,"b","ab",13),(3,"c","na",16),(3,"c","dir",18),(3,"c","ls",23))).toDF("type","f1","f2","value")
inputDF.withColumn("leadValue",lead($"value",1).over(windowSpec))
.withColumn("result", when(abs($"leadValue" - $"value") <= 2, 1).otherwise(0)) //check for condition
.filter($"result" === 0) //filter the rows
.drop("leadValue","result") //remove additional columns
.orderBy($"type")
.show
Вывод:
+----+---+---+-----+
|type| f1| f2|value|
+----+---+---+-----+
| 1| a| xy| 11|
| 2| b| ab| 13|
| 3| c|dir| 18|
| 3| c| ls| 23|
+----+---+---+-----+
Здесь, как мы уже делим на type
& f1
, нам не нужнопроверить их условие равенства