Самое короткое решение - использовать transform
функцию более высокого порядка (введена в Spark 2.4):
ds.selectExpr(
"transform(structs, xs -> transform(xs, x -> x.foo)) as foo",
"transform(structs, xs -> transform(xs, x -> x.bar)) as bar"
)
В более старой версии вам потребуется либо эквивалент udf
*, либо набрать map
:
ds.as[Schema]
.map(x => (
x.structs.map(_.map(_.foo)),
x.structs.map(_.map(_.bar))
)).toDF("foo", "bar")
Прежнее решение можно обобщить:
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def expand(ds: DataFrame, col: String) = {
val fields = ds.schema(col).dataType match {
case ArrayType(ArrayType(s: StructType, _), _) => s.fieldNames
}
val exprs = fields.map {
field => expr(
s"transform(`$col`, xs -> transform(xs, x -> x.`$field`)) as `$field`"
)
}
ds.select(exprs: _*)
}
expand(ds.toDF, "structs")
Последнее, вероятно, не так много, если только вы не хотите использовать отражение Scala (а это серьезное излишество).
* Что-то вокруг этих строк должно сработать:
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.functions.udf
def extract[T : TypeTag](field: String) = udf(
(xs: Seq[Seq[Row]]) => xs.map(_.map(_.getAs[T](field)))
)
val extractString = extract[String] _
val extractInt = extract[Int] _
ds.select(
extractString("foo")($"structs").as("foo"),
extractInt("bar")($"structs").as("bar")
)