Spark: Как разбить тип структуры на несколько столбцов? - PullRequest
0 голосов
/ 03 сентября 2018

Я знаю, что этот вопрос задавался много раз на Переполнении стека и был удовлетворительным ответом в большинстве постов, но я не уверен, что это лучший способ в моем случае. У меня есть набор данных, который имеет несколько встроенных типов структур:

root
 |-- STRUCT1: struct (nullable = true)
 |    |-- FIELD_1: string (nullable = true)
 |    |-- FIELD_2: long (nullable = true)
 |    |-- FIELD_3: integer (nullable = true)
 |-- STRUCT2: struct (nullable = true)
 |    |-- FIELD_4: string (nullable = true)
 |    |-- FIELD_5: long (nullable = true)
 |    |-- FIELD_6: integer (nullable = true)
 |-- STRUCT3: struct (nullable = true)
 |    |-- FIELD_7: string (nullable = true)
 |    |-- FIELD_8: long (nullable = true)
 |    |-- FIELD_9: integer (nullable = true)
 |-- ARRAYSTRUCT4: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FIELD_10: integer (nullable = true)
 |    |    |-- FIELD_11: integer (nullable = true)

+-------+------------+------------+------------------+
|STRUCT1| STRUCT2    | STRUCT3    | ARRAYSTRUCT4     |
+-------+------------+------------+------------------+
|[1,2,3]|[aa, xx, yy]|[p1, q2, r3]|[[1a, 2b],[3c,4d]]|
+-------+------------+------------+------------------+

Я хочу преобразовать это в:

1. Набор данных, в котором структуры развернуты в столбцы.
2. Набор данных, в котором массив (ARRAYSTRUCT4) разбивается на строки.

root
 |-- FIELD_1: string (nullable = true)
 |-- FIELD_2: long (nullable = true)
 |-- FIELD_3: integer (nullable = true)
 |-- FIELD_4: string (nullable = true)
 |-- FIELD_5: long (nullable = true)
 |-- FIELD_6: integer (nullable = true)
 |-- FIELD_7: string (nullable = true)
 |-- FIELD_8: long (nullable = true)
 |-- FIELD_9: integer (nullable = true)
 |-- FIELD_10: integer (nullable = true)
 |-- FIELD_11: integer (nullable = true)

+-------+------------+------------+---------+     ---------+----------+
|FIELD_1| FIELD_2    | FIELD_3    | FIELD_4 |     |FIELD_10| FIELD_11 |
+-------+------------+------------+---------+ ... ---------+----------+
|1      |2           |3           |  aa     |     |  1a    |  2b      |
+-------+------------+------------+-----------------------------------+

Чтобы достичь этого, я мог бы использовать:

val expanded = df.select("STRUCT1.*", "STRUCT2.*", "STRUCT3.*", "STRUCT4")

с последующим взрывом:

val exploded = expanded.select(explode(expanded("STRUCT4")))

Однако мне было интересно, есть ли более функциональный способ сделать это, особенно выбор. Я мог бы использовать withColumn, как показано ниже:

data.withColumn("FIELD_1", $"STRUCT1".getItem(0))
      .withColumn("FIELD_2", $"STRUCT1".getItem(1))
      .....

Но у меня есть 80+ столбцов. Есть ли лучший способ добиться этого?

1 Ответ

0 голосов
/ 03 сентября 2018

Вы можете сначала сделать все столбцы struct -типа с помощью explode -все любые Array(struct) столбцы в struct столбцы с помощью foldLeft, а затем использовать map для интерполяции каждого из имен struct столбцов в col.*, как показано ниже:

import org.apache.spark.sql.functions._

case class S1(FIELD_1: String, FIELD_2: Long, FIELD_3: Int)
case class S2(FIELD_4: String, FIELD_5: Long, FIELD_6: Int)
case class S3(FIELD_7: String, FIELD_8: Long, FIELD_9: Int)
case class S4(FIELD_10: Int, FIELD_11: Int)

val df = Seq(
  (S1("a1", 101, 11), S2("a2", 102, 12), S3("a3", 103, 13), Array(S4(1, 1), S4(3, 3))),
  (S1("b1", 201, 21), S2("b2", 202, 22), S3("b3", 203, 23), Array(S4(2, 2), S4(4, 4)))
).toDF("STRUCT1", "STRUCT2", "STRUCT3", "ARRAYSTRUCT4")

// +-----------+-----------+-----------+--------------+
// |    STRUCT1|    STRUCT2|    STRUCT3|  ARRAYSTRUCT4|
// +-----------+-----------+-----------+--------------+
// |[a1,101,11]|[a2,102,12]|[a3,103,13]|[[1,1], [3,3]]|
// |[b1,201,21]|[b2,202,22]|[b3,203,23]|[[2,2], [4,4]]|
// +-----------+-----------+-----------+--------------+

val arrayCols = df.dtypes.filter( t => t._2.startsWith("ArrayType(StructType") ).
  map(_._1)
// arrayCols: Array[String] = Array(ARRAYSTRUCT4)

val expandedDF = arrayCols.foldLeft(df)((accDF, c) =>
  accDF.withColumn(c.replace("ARRAY", ""), explode(col(c))).drop(c)
)

val structCols = expandedDF.columns

expandedDF.select(structCols.map(c => col(s"$c.*")): _*).
  show
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// |FIELD_1|FIELD_2|FIELD_3|FIELD_4|FIELD_5|FIELD_6|FIELD_7|FIELD_8|FIELD_9|FIELD_10|FIELD_11|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// |     a1|    101|     11|     a2|    102|     12|     a3|    103|     13|       1|       1|
// |     a1|    101|     11|     a2|    102|     12|     a3|    103|     13|       3|       3|
// |     b1|    201|     21|     b2|    202|     22|     b3|    203|     23|       2|       2|
// |     b1|    201|     21|     b2|    202|     22|     b3|    203|     23|       4|       4|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+

Обратите внимание, что для простоты предполагается, что ваш DataFrame имеет только столбцы типа struct и Array(struct). Если есть другие типы данных, просто примените условия фильтрации к arrayCols и structCols соответственно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...