Удалите Null из столбцов массива в Dataframe в Scala с помощью Spark (1.6) - PullRequest
0 голосов
/ 07 мая 2018

У меня есть фрейм данных с ключевым столбцом и столбцом, который имеет массив struct. Схема выглядит следующим образом.

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

Массив "desc" может иметь любое количество нулевых значений. Я хотел бы создать окончательный фрейм данных с массивом, не имеющим ни одного из нулевых значений, используя spark 1.6:

Примером может быть:

Key  .   Value
1010 .   [[George,21],null,[MARIE,13],null]
1023 .   [null,[Watson,11],[John,35],null,[Kyle,33]]

Я хочу получить окончательный кадр данных как:

Key  .   Value
1010 .   [[George,21],[MARIE,13]]
1023 .   [[Watson,11],[John,35],[Kyle,33]]

Я пытался сделать это с UDF и case-классом, но получил

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to....

Любая помощь очень ценится, и я предпочел бы делать это без преобразования в СДР, если это необходимо. Кроме того, я новичок в Spark и Scala, поэтому заранее спасибо !!!

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Вот другая версия:

case class Person(name: String, age: Int)

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = false)

+----+-----------------------------------------------+
|id  |desc                                           |
+----+-----------------------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|
+----+-----------------------------------------------+


val filterOutNull = udf((xs: Seq[Row]) => {
  xs.flatMap {
    case null => Nil
    // convert the Row back to your specific struct:
    case Row(s: String,i: Int) => List(Person(s, i))
  }
})

val result = df.withColumn("filteredListDesc", filterOutNull($"desc"))

+----+-----------------------------------------------+-----------------------------------+
|id  |desc                                           |filteredListDesc                   |
+----+-----------------------------------------------+-----------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |[[George,21], [MARIE,13]]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------------------+-----------------------------------+
0 голосов
/ 07 мая 2018

Учитывая, что исходный кадр данных имеет следующую схему

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

Определение udf функции для удаления нулевых значений из массива должно работать для вас

import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))

df.withColumn("desc", removeNull(col("desc")))

, где element является case class

case class element(name: String, age: Long)

и вы должны получить

+----+-----------------------------------+
|id  |desc                               |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]]          |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+
...