Unnesting 2D-массив структур в структуру 2D-массивов - PullRequest
2 голосов
/ 17 июня 2019

У меня есть столбец типа array<array<struct<a: String, b: Int>>>.

Я хочу столбец типа struct<a: array<array<String>>, b: array<array<Int>>.

В идеале, эта процедура должна автоматически разворачивать все структурные поля (т. Е. Без необходимости указывать поля "a" и "b" вручную), но все, что работает, было бы здесь чрезвычайно полезно.

Пример кода, который у меня есть (я пытаюсь превратить ds в expected).

case class Struct(foo: String, bar: Int)
case class Schema(structs: Vector[Vector[Struct]])

val ss = spark
import ss.implicits._

val ds = Seq(Schema(Vector(Vector(Struct("a", 1), Struct("b", 2)), Vector(Struct("c", 3))))).toDS

val expected = Seq(
    (Vector(Vector("a", "b"), Vector("c")), Vector(Vector(1, 2), Vector(3)))
).toDF("foo", "bar")

1 Ответ

1 голос
/ 17 июня 2019

Самое короткое решение - использовать transform функцию более высокого порядка (введена в Spark 2.4):

ds.selectExpr(
  "transform(structs, xs -> transform(xs, x -> x.foo)) as foo",
  "transform(structs, xs -> transform(xs, x -> x.bar)) as bar"
)

В более старой версии вам потребуется либо эквивалент udf*, либо набрать map:

ds.as[Schema]
  .map(x => (
    x.structs.map(_.map(_.foo)), 
    x.structs.map(_.map(_.bar))
)).toDF("foo", "bar")

Прежнее решение можно обобщить:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.DataFrame

def expand(ds: DataFrame, col: String) = {

  val fields = ds.schema(col).dataType match {
    case ArrayType(ArrayType(s: StructType, _), _) => s.fieldNames
  }
  val exprs = fields.map {
    field => expr(
      s"transform(`$col`, xs -> transform(xs, x -> x.`$field`)) as `$field`"
    )
  }
  ds.select(exprs: _*)
}

expand(ds.toDF, "structs")

Последнее, вероятно, не так много, если только вы не хотите использовать отражение Scala (а это серьезное излишество).


* Что-то вокруг этих строк должно сработать:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.functions.udf

def extract[T : TypeTag](field: String) = udf(
  (xs: Seq[Seq[Row]]) => xs.map(_.map(_.getAs[T](field)))
)

val extractString = extract[String] _
val extractInt = extract[Int] _

ds.select(
  extractString("foo")($"structs").as("foo"),
  extractInt("bar")($"structs").as("bar")
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...