Применить проверку с некоторыми столбцами в Spark SQL - PullRequest
0 голосов
/ 27 июня 2018

кадр данных проверки:

+---------+---------------------------+-------------------------+
|dataframe|Validation Checks          |cols                     |
+---------+---------------------------+-------------------------+
|Attendee |isEmpty,IsNull             |col1,col2,col3           |
+---------+---------------------------+-------------------------+

Дата-кадр участника:

    col1    col2    col3
    a1       a2     a3
             b2     b3
    c1       c2     c3
    d1       d2     d3

Ожидаемый результат, фрейм данных:

    col1    col2    col3   status
    a1       a2     a3      clean
             b2     b3      dirty  
    c1       c2     c3      clean
    d1       d2     d3      clean

Используемый код:

var columns = df.columns //struct(df.columns map col: _*) 
val colDF = df.select(col("dataframe")) 
var tablename = colDF.head().toSeq 
val checkDF = df.select(col("Validation Checks")) 
val opsColDF = df.select(col("cols")) 
val opsColumn = opsColDF.columns println("opsColumn :::" + opsColumn) 

Ответы [ 2 ]

0 голосов
/ 29 июня 2018
package com.incedo.pharma
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, struct}
object objValidation {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
     .builder
     .appName("columncheck")
     .master("local[*]")
     .getOrCreate()
     val df = spark.read.format("com.databricks.spark.csv")
              .option("header", true)
              .option("delimiter", ",")
              .load("tablecolcheck.csv")
     println("validation dataframe :::::"+df.show())
     var AttendeeDF = df
     var tableNameArray = df.select(col("tablename")).collect().toSeq
     val dataframeWithQuery = df.withColumn("query", createQueryUdf(df("tablename"), df("Validation Checks"), df("cols")))
     println("dataframeWithQuery ---------------------"+dataframeWithQuery.show(false))
     tableNameArray.foreach(tableArray => {
       AttendeeDF = spark.read.format("com.databricks.spark.csv")
                         .option("header", true)
                         .option("delimiter", ",")
                         //.load("AttendeeTable.csv")
                         .load(tableArray.get(0)+".csv")
       println("AttendeeDF ::::"+AttendeeDF.show(false))                    
       AttendeeDF.createOrReplaceTempView("AttendeeTable")
       var queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
       println("queryArray ----"+queryArray.toSeq)
       for(query <- queryArray){
         spark.sql(query).show(false)
       }
     })
  }

  def createQueryUdf = udf((table: String, logic: String, cols: String) => {
  "select *, case when "+
    cols.split(",")
      .map(_.trim)

      .map(x => logic.split(",")
        .map(_.trim.toLowerCase)
        .map{
            case y if (y == "isempty") => s"$x like ''"
            case y if (y == "gt>3") => s"length($x) > 3"
            case y => s"$y($x)"
        }.mkString(" or "))
      .mkString(" or ") +
  s" then 'dirty' else 'clean' end as status from $table"
})
}
0 голосов
/ 28 июня 2018

Если у вас есть dataframe как

+---------+-----------------+--------------+
|dataframe|Validation Checks|cols          |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|
+---------+-----------------+--------------+

Вы должны сделать SQL-запрос, используя значения столбца . Я создал еще один столбец, используя функцию udf, делая правильный запрос

import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
  "select *, case when "+
    cols.split(",")
      .map(_.trim)
      .map(x => logic.split(",")
        .map(_.trim.toLowerCase)
        .map{
            case y if (y == "isempty") => s"$x like ''"
            case y => s"$y($x)"
        }.mkString(" or "))
      .mkString(" or ") +
  s" then 'dirty' else 'clean' end as status from $table"
})

val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))

так что dataframeWithQuery будет

+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols          |query                                                                                                                                                                 |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Теперь вы можете выбрать действительный запрос для попадания на фреймы данных, но до этого фреймы данных все зарегистрированы как

attendee.createOrReplaceTempView("Attendee")

Тогда вы можете просто collect столбец запроса и loop , чтобы применить операторы запроса

val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))

for(query <- queryArray){
  spark.sql(query).show(false)
}

что должно дать вам

+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1  |a2  |a3  |clean |
|    |b2  |b3  |dirty |
|c1  |c2  |c3  |clean |
|d1  |d2  |d3  |clean |
+----+----+----+------+

К настоящему времени вы должны иметь представление о том, как действовать дальше. Я надеюсь, что ответ полезен

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...