Здесь нет необходимости в итеративном объединении.Создайте литерал map<string, struct<double, double>>
, используя o.a.s.sql.functions.map
(в функциональном смысле он ведет себя как задержанный string => struct<lower: dobule, upper: double>
)
import org.apache.spark.sql.functions._
val bins: Seq[(String, Double Double)] = Seq(
("bin1",1.0,2.0),("bin2",3.0,4.0),("bin3",5.0,6.0))
val binCol = map(bins.map {
case (key, lower, upper) => Seq(
lit(key),
struct(lit(lower) as "lower", lit(upper) as "upper"))
}.flatten: _*)
определяют выражения, подобные этим (это простой поиск в предопределенном отображении,поэтому binCol(col(refCol))
задерживается struct<lower: dobule, upper: double>
, а оставшееся apply
занимает поле lower
или upper
):
val lower = binCol(col(refCol))("lower")
val upper = binCol(col(refCol))("upper")
val c = col(colName)
и используется CASE ... WHEN ...
( искровой эквивалент IF тогда.ELSE )
val result = when(c.between(lower, upper), c)
.when(c < lower, lower)
.when(c > upper, upper)
выберите и отбросьте NULL
s:
df
.withColumn(colName, result)
// If value is still NULL it means we didn't find refCol key in binCol keys.
// To mimic .filter(col(refCol) === ...) we drop the rows
.na.drop(Seq(colName))
В этом решении предполагается, что в начале colName
нет значений NULL
, но может быть легко настроен для обработки случаев, когда это предположение не выполняется.
Если процесс все еще неясен, я бы рекомендовал отслеживать его поэтапно с литералами:
spark.range(1).select(binCol as "map").show(false)
+------------------------------------------------------------+
|map |
+------------------------------------------------------------+
|[bin1 -> [1.0, 2.0], bin2 -> [3.0, 4.0], bin3 -> [5.0, 6.0]]|
+------------------------------------------------------------+
spark.range(1).select(binCol(lit("bin1")) as "value").show(false)
+----------+
|value |
+----------+
|[1.0, 2.0]|
+----------+
spark.range(1).select(binCol(lit("bin1"))("lower") as "value").show
+-----+
|value|
+-----+
| 1.0|
+-----+
и далее со ссылкой на Запрос Spark SQL DataFrame со сложными типами .