Вы можете сначала сделать все столбцы struct
-типа с помощью explode
-все любые Array(struct)
столбцы в struct
столбцы с помощью foldLeft
, а затем использовать map
для интерполяции каждого из имен struct
столбцов в col.*
, как показано ниже:
import org.apache.spark.sql.functions._
case class S1(FIELD_1: String, FIELD_2: Long, FIELD_3: Int)
case class S2(FIELD_4: String, FIELD_5: Long, FIELD_6: Int)
case class S3(FIELD_7: String, FIELD_8: Long, FIELD_9: Int)
case class S4(FIELD_10: Int, FIELD_11: Int)
val df = Seq(
(S1("a1", 101, 11), S2("a2", 102, 12), S3("a3", 103, 13), Array(S4(1, 1), S4(3, 3))),
(S1("b1", 201, 21), S2("b2", 202, 22), S3("b3", 203, 23), Array(S4(2, 2), S4(4, 4)))
).toDF("STRUCT1", "STRUCT2", "STRUCT3", "ARRAYSTRUCT4")
// +-----------+-----------+-----------+--------------+
// | STRUCT1| STRUCT2| STRUCT3| ARRAYSTRUCT4|
// +-----------+-----------+-----------+--------------+
// |[a1,101,11]|[a2,102,12]|[a3,103,13]|[[1,1], [3,3]]|
// |[b1,201,21]|[b2,202,22]|[b3,203,23]|[[2,2], [4,4]]|
// +-----------+-----------+-----------+--------------+
val arrayCols = df.dtypes.filter( t => t._2.startsWith("ArrayType(StructType") ).
map(_._1)
// arrayCols: Array[String] = Array(ARRAYSTRUCT4)
val expandedDF = arrayCols.foldLeft(df)((accDF, c) =>
accDF.withColumn(c.replace("ARRAY", ""), explode(col(c))).drop(c)
)
val structCols = expandedDF.columns
expandedDF.select(structCols.map(c => col(s"$c.*")): _*).
show
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// |FIELD_1|FIELD_2|FIELD_3|FIELD_4|FIELD_5|FIELD_6|FIELD_7|FIELD_8|FIELD_9|FIELD_10|FIELD_11|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
// | a1| 101| 11| a2| 102| 12| a3| 103| 13| 1| 1|
// | a1| 101| 11| a2| 102| 12| a3| 103| 13| 3| 3|
// | b1| 201| 21| b2| 202| 22| b3| 203| 23| 2| 2|
// | b1| 201| 21| b2| 202| 22| b3| 203| 23| 4| 4|
// +-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+
Обратите внимание, что для простоты предполагается, что ваш DataFrame имеет только столбцы типа struct
и Array(struct)
. Если есть другие типы данных, просто примените условия фильтрации к arrayCols
и structCols
соответственно.