лучший способ написать Spark Dataset [U] - PullRequest
0 голосов
/ 12 июля 2019

Хорошо, поэтому мне известно о том, что Dataset.as[U] просто меняет представление кадра данных для напечатанных операций.

Как видно в этом примере:

case class One(one: Int)

val df = Seq(
    (1,2,3),
    (11,22,33),
    (111,222,333)
    ).toDF("one", "two", "thre")

val ds : Dataset[One] = df.as[One]

ds.show

prints

+----+----+-----+
| one| two|three|
+----+----+-----+
|   1|   2|    3|
|  11|  22|   33|
| 111| 222|  333|
+----+----+-----+

Это совершенно нормально и работает в мою пользу большую часть времени.НО теперь мне нужно записать это ds на диск, только с этим столбцом one.

Чтобы реализовать схему, которую я мог бы сделать .map(x => x), так как это типизированная операция, схема класса case будет приниматьэффект.Эта операция также приводит к Dataset[One], но с базовыми данными, уменьшенными до столбца one.Это просто кажется ужасно дорогим, если смотреть на план выполнения

== Physical Plan ==
*SerializeFromObject [assertnotnull(input[0, $line2012488405320.$read$$iw$$iw$One, true]).one AS one#9408]
+- *MapElements <function1>, obj#9407: $line2012488405320.$read$$iw$$iw$One
   +- *DeserializeToObject newInstance(class $line2012488405320.$read$$iw$$iw$One), obj#9406: $line2012488405320.$read$$iw$$iw$One
      +- LocalTableScan [one#9391]

Какие альтернативные реализации можно достичь

ds.show
+----+
| one|
+----+
|   1|
|  11|
| 111|
+----+

ОБНОВЛЕНИЕ 1
Я думал ообщее решение проблемы.Может быть, что-то в этом роде?:

def caseClassAccessorNames[T <: Product](implicit tag: TypeTag[T]) = {
  typeOf[T]
    .members
    .collect {
      case m: MethodSymbol if m.isCaseAccessor => m.name
    }
    .map(m => m.toString)
}

def project[T <: Product](ds: Dataset[T])(implicit tag: TypeTag[T]): Dataset[T] = {
  import ds.sparkSession.implicits._

  val columnsOfT: Seq[Column] =
      caseClassAccessorNames
      .map(col)(scala.collection.breakOut)

  val t: DataFrame = ds.select(columnsOfT: _*)

  t.as[T]
}

Мне удалось заставить это работать для тривиального примера, но мне нужно оценить его дальше.Интересно, есть ли альтернативные, может быть, встроенные способы достижения чего-то подобного?

...