Не пытался использовать LAG, думаю, что также будет работать. Но посмотрел на ваше требование с joinWith и решил применить логи c по соображениям производительности. Многие шаги в работе пропущены. Используются разные имена, вы можете абстрагироваться, переименовывать и отбрасывать столбцы.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class mtr0(mtr: String, seqNum: Int)
case class mtr(mtr: String, seqNum: Int, rank: Int)
// Gen data & optimize for JOINing, just interested in max 2 records for ranked sets.
val curr0 = Seq(
mtr0("m1", 1),
mtr0("m1", 2),
mtr0("m1", 3),
mtr0("m2", 7)
).toDS
val curr1 = curr0.withColumn("rank", row_number().over(Window.partitionBy($"mtr").orderBy($"seqNum".desc)))
// Reduce before JOIN.
val currF=curr1.filter($"rank" === 1 ).as[mtr]
//currF.show(false)
val prevF=curr1.filter($"rank" === 2 ).as[mtr]
//prevF.show(false)
val selfDF = currF.as("curr").joinWith(prevF.as("prev"),
( col("curr.mtr") === col("prev.mtr") && (col("curr.rank") === 1) && (col("prev.rank") === 2)),"left")
// Null value evident when only 1 entry per meter.
selfDF.show(false)
возвращает:
+----------+----------+
|_1 |_2 |
+----------+----------+
|[m1, 3, 1]|[m1, 2, 2]|
|[m2, 7, 1]|null |
+----------+----------+
selfDF: org.apache.spark.sql.Dataset[(mtr, mtr)] = [_1: struct<mtr: string, seqNum: int ... 1 more field>, _2: struct<mtr: string, seqNum: int ... 1 more field>]