Если у вас есть dataframe
как
+---------+-----------------+--------------+
|dataframe|Validation Checks|cols |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull |col1,col2,col3|
+---------+-----------------+--------------+
Вы должны сделать SQL-запрос, используя значения столбца . Я создал еще один столбец, используя функцию udf
, делая правильный запрос
import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))
так что dataframeWithQuery
будет
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols |query |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Теперь вы можете выбрать действительный запрос для попадания на фреймы данных, но до этого фреймы данных все зарегистрированы как
attendee.createOrReplaceTempView("Attendee")
Тогда вы можете просто collect
столбец запроса и loop , чтобы применить операторы запроса
val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
for(query <- queryArray){
spark.sql(query).show(false)
}
что должно дать вам
+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1 |a2 |a3 |clean |
| |b2 |b3 |dirty |
|c1 |c2 |c3 |clean |
|d1 |d2 |d3 |clean |
+----+----+----+------+
К настоящему времени вы должны иметь представление о том, как действовать дальше. Я надеюсь, что ответ полезен