Просто используйте when
/ otherwise
с foldLeft
:
import org.apache.spark.sql.functions._
val conditions = Map(
"item1" -> Seq("foo", "bar"), "item2" -> Seq("foobar")
)
conditions.foldLeft(lit(null)){
case (acc, (v, ls)) => when(col("col1").isin(ls: _*), v).otherwise(acc)
}
, что создаст вложенное CASE WHEN
выражение формы:
CASE WHEN (col1 IN (foobar)) THEN item2
ELSE CASE WHEN (col1 IN (foo, bar)) THEN item1
ELSE NULL
END
END
Вы можете заменитьlit(null)
с другим значением, которое вы хотите использовать в качестве базового результата (если никакое значение не соответствует).
Вы также можете сгенерировать
CASE
WHEN (col1 IN (foo, bar)) THEN item1
WHEN (col1 IN (foobar)) THEN item2
END
, используя рекурсивную функцию, подобную этой:
import org.apache.spark.sql.Column
def mergeConditions(conditions: Map[String, Seq[String]], c: Column) = {
def mergeConditionsR(conditions: Seq[(String, Seq[String])], acc: Column): Column = conditions match {
case (v, ls) :: t => mergeConditionsR(t, acc.when(c.isin(ls: _*), v))
case Nil => acc
}
conditions.toList match {
case (v, ls) :: t => mergeConditionsR(t, when(c.isin(ls: _*), v))
case Nil => lit(null)
}
}
mergeConditions(conditions, col("col1"))
но это не должно иметь большого значения.
При простом входе можно, конечно, полностью пропустить when
:
import org.apache.spark.sql.functions.typedLit
val conditionsCol = typedLit(conditions.flatMap {
case (k, vs) => vs.map { v => (v, k) }
}.toMap)
df.withColumn("value", conditionsCol($"col1"))
илипреобразование conditions
в DataFrame
и соединение.
conditions.toSeq.toDF("value", "col1")
.withColumn("col1", explode($"col1"))
.join(df, Seq("col1"), "rightouter")