Вот один из способов использования UDF для получения значения evt
для сравнения:
import org.apache.spark.sql.functions._
val df = Seq(
(862304021470656L, 25.284158, 82.435973, 1540980095),
(862304021470656L, 25.284158, 82.435973, 1540980105),
(862304021470656L, 25.284724, 82.434222, 1540980155),
(866561010400483L, 25.284858, 82.433831, 1544980165)
).toDF("id", "lt", "ln", "evt")
val listMap = Map(
862304021470656L -> List(0.0, 0.0, 0.0, 0.0, 1.540980096E9, 74.365111, 22.302669, 0.0),
866561010400483L -> List(0.0, 1.0, 1.0, 2.0, 1.543622306E9, 78.0204, 10.005262, 56.0)
)
def evtLimit(m: Map[Long, List[Double]], evtIdx: Int) = udf(
(id: Long) => m.get(id) match {
case Some(ls) => if (evtIdx < ls.size) ls(evtIdx) else Double.MaxValue
case None => Double.MaxValue
}
)
df.where($"evt" > evtLimit(listMap, 4)($"id")).show
// +---------------+---------+---------+----------+
// | id| lt| ln| evt|
// +---------------+---------+---------+----------+
// |862304021470656|25.284158|82.435973|1540980105|
// |862304021470656|25.284724|82.434222|1540980155|
// |866561010400483|25.284858|82.433831|1544980165|
// +---------------+---------+---------+----------+
Обратите внимание, что UDF возвращает Double.MaxValue
в случае несовпадения ключа или недопустимого значения в предоставленной карте,Это, безусловно, может быть пересмотрено для конкретных бизнес-требований.