Независимо взорвать несколько столбцов в Spark - PullRequest
2 голосов
/ 15 мая 2019

У меня есть схема, где каждая строка содержит несколько столбцов массивов, и я хочу разбить каждый столбец массива независимо друг от друга.

Предположим, у нас есть столбцы:

**userId    someString      varA     varB       someBool
   1        "example1"    [0,2,5]   [1,2,9]        true
   2        "example2"    [1,20,5]  [9,null,6]     false

Я хочу получить вывод:

userId    someString      varA     varB   someBool
   1      "example1"       0        null    true
   1      "example1"       2        null    true
   1      "example1"       5        null    true
   1      "example1"       1        null    true
   1      "example1"       20       null    true
   1      "example1"       5        null    true
   2      "example2"       null      1      false
   2      "example2"       null      2      false
   2      "example2"       null      9      false
   2      "example2"       null      9      false
   2      "example2"       null     null    false
   2      "example2"       null      6      false

Идеи?

(О, и я пытаюсь сделать это в общем, поэтому мне не нужно обновлять код при изменении схемы,а также потому, что схема фактическая довольно большая ...)

PS - Переходит к этому очень похожему, но другому вопросу, из которого я бесстыдно украл данные примера.

Редактировать: @oliik с победой, но было бы ТАКЖЕ замечательно увидеть путь к этому с df.flatMap (в основном потому, что я до сих пор не лажу flatMap)

1 Ответ

5 голосов
/ 15 мая 2019

Вы всегда можете сгенерировать выбор программно

val df = Seq(
  (1, "example1", Seq(0,2,5), Seq(Some(1),Some(2),Some(9)), true),
  (2, "example2", Seq(1,20,5), Seq(Some(9),Option.empty[Int],Some(6)), false)
).toDF("userId", "someString", "varA", "varB", "someBool")

val arrayColumns = df.schema.fields.collect {
  case StructField(name, ArrayType(_, _), _, _) => name
}

val dfs = arrayColumns.map { expname =>
  val columns = df.schema.fields.map {
    case StructField(name, ArrayType(_, _), _, _) if expname == name => explode(df.col(name)) as name
    case StructField(name, ArrayType(_, _), _, _) => lit(null) as name
    case StructField(name, _, _, _) => df.col(name)
  }
  df.select(columns:_*)
}

dfs.reduce(_ union _).show()
+------+----------+----+----+--------+
|userId|someString|varA|varB|someBool|
+------+----------+----+----+--------+
|     1|  example1|   0|null|    true|
|     1|  example1|   2|null|    true|
|     1|  example1|   5|null|    true|
|     2|  example2|   1|null|   false|
|     2|  example2|  20|null|   false|
|     2|  example2|   5|null|   false|
|     1|  example1|null|   1|    true|
|     1|  example1|null|   2|    true|
|     1|  example1|null|   9|    true|
|     2|  example2|null|   9|   false|
|     2|  example2|null|null|   false|
|     2|  example2|null|   6|   false|
+------+----------+----+----+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...