Решение без UDF
Это будет работать до тех пор, пока ваш logicDF достаточно мал для сбора в драйвер.
Шаг 1
Соберите свою логику c в Array[(Int, String)]
, как:
val rules = logicDF.collect().map{ r: Row =>
val slNo = r.getAs[Int](0)
val condition = r.getAs[String](1)
(slNo, condition)
}
Шаг 2
Построить новый столбец с условными значениями, объединяющими эти правила в когда Column
. Для этого используйте цикл scala, например:
val unused = when(lit(false), lit(false))
val filters: Column = rules.foldLeft(unused){
case (acc: Column, (slNo: Int, cond: String)) =>
acc.when(col("slNo") === slNo, expr(cond))
}
//You will get something like:
//when(col("slNo") === 1, expr("age > 10"))
//.when(col("slNo") === 2, expr("age > 20"))
//...
Шаг 3
Получите декартово произведение обоих DataFrames с объединением, чтобы вы могли применить каждое правило к каждой строке в Ваши данные:
val joinDF = logicDF.join(inputDF, lit(true), "inner") //inner or whatever
Шаг 4
Фильтр с использованием предыдущего Column
с условными фильтрами.
val withRulesDF = joinDF.filter(filters)
Шаг 5
Группировка и количество:
val resultDF = withRulesDF
.groupBy("slNo", "filterCondtion")
.agg(count("*") as "count")