Как я могу проверить пустые значения в искровом Dataframe с помощью пользовательских функций - PullRequest
2 голосов
/ 07 июня 2019

ребята, у меня есть эта пользовательская функция, чтобы проверить, являются ли строки текста пустыми:

import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._
    {{{
      val df = Seq(
        (0, "","Mongo"),
        (1, "World","sql"),
        (2, "","")
        ).toDF("id", "text", "Source")

      // Define a "regular" Scala function
      val checkEmpty: String => Boolean =  x => {
        var test = false
        if(x.isEmpty){
          test = true
        }
        test
      }
      val upper = udf(checkEmpty)
      df.withColumn("isEmpty", upper('text)).show
    }}}

Я на самом деле получаю этот фрейм данных:

+---+-----+------+-------+
| id| text|Source|isEmpty|
+---+-----+------+-------+
|  0|     | Mongo|   true|
|  1|World|   sql|  false|
|  2|     |      |   true|
+---+-----+------+-------+

Как я могпроверить все строки на наличие пустых значений и вернуть сообщение, например:

id 0 содержит текстовый столбец с пустыми значениями
id 2 содержит текст, исходный столбец с пустыми значениями

Ответы [ 2 ]

3 голосов
/ 07 июня 2019

UDF, который получает пустые столбцы в качестве строки, может использоваться для получения пустых имен столбцов. Затем строки с непустыми столбцами могут быть отфильтрованы:

val emptyColumnList = (r: Row) => r
  .toSeq
  .zipWithIndex
  .filter(_._1.toString().isEmpty)
  .map(pair => r.schema.fields(pair._2).name)

val emptyColumnListUDF = udf(emptyColumnList)

val columnsToCheck = Seq($"text", $"Source")
val result = df
  .withColumn("EmptyColumns", emptyColumnListUDF(struct(columnsToCheck: _*)))
  .where(size($"EmptyColumns") > 0)
  .select(format_string("id %s has the %s columns with empty values", $"id", $"EmptyColumns").alias("description"))

Результат:

+----------------------------------------------------+
|description                                         |
+----------------------------------------------------+
|id 0 has the [text] columns with empty values       |
|id 2 has the [text,Source] columns with empty values|
+----------------------------------------------------+
2 голосов
/ 07 июня 2019

Вы можете сделать что-то вроде этого:

case class IsEmptyRow(id: Int, description: String) //case class for column names

val isEmptyDf = df.map { 
   row => row.getInt(row.fieldIndex("id")) -> row //we take id of row as first column
     .toSeq //then to get secod we change row values to seq
     .zip(df.columns) //zip it with column names
     .collect { //if value is string and empty we append column name
        case (value: String, column) if value.isEmpty => column
     }
}.map { //then we create description string and pack results to case class
   case (id, Nil)  => IsEmptyRow(id, s"id $id has no columns with empty values")
   case (id, List(column))  => IsEmptyRow(id, s"id $id has the $column column with empty values")
   case (id, columns) => IsEmptyRow(id, s"id $id has the ${columns.mkString(", ")} columns with empty values")
}

После запуска isEmptyDf.show(truncate = false) отобразится:

+---+---------------------------------------------------+
|id |description                                        |
+---+---------------------------------------------------+
|0  |id 0 has the text columns with empty values        |
|1  |id 1 has no columns with empty values              |
|2  |id 2 has the text, Source columns with empty values|
+---+---------------------------------------------------+

Вы также можете присоединиться к оригиналу dataset:

df.join(isEmptyDf, "id").show(truncate = false)
...