Кадр данных с диапазонами идентификаторов датчиков может быть создан, а затем присоединен к исходному кадру данных:
val df = Seq((1, "specificSensor", Some(1)),
(2, "1234", None),
(3, "1234", None),
(4, "specificSensor", Some(2)),
(5, "2345", None),
(6, "2345", None))
.toDF("ID", "Sensor", "No")
val idWindow = Window.orderBy("ID")
val sensorsRange = df
.where($"Sensor" === "specificSensor")
.withColumn("nextId", coalesce(lead($"id", 1).over(idWindow), lit(Long.MaxValue)))
sensorsRange.show(false)
val joinColumn = $"d.ID" > $"s.id" && $"d.ID" < $"s.nextId"
val result =
df.alias("d")
.join(sensorsRange.alias("s"), joinColumn, "left")
.select($"d.ID", $"d.Sensor", coalesce($"d.No", $"s.No").alias("No"))
Вывод:
+---+--------------+---+-------------------+
|ID |Sensor |No |nextId |
+---+--------------+---+-------------------+
|1 |specificSensor|1 |4 |
|4 |specificSensor|2 |9223372036854775807|
+---+--------------+---+-------------------+
+---+--------------+---+
|ID |Sensor |No |
+---+--------------+---+
|1 |specificSensor|1 |
|2 |1234 |1 |
|3 |1234 |1 |
|4 |specificSensor|2 |
|5 |2345 |2 |
|6 |2345 |2 |
+---+--------------+---+