Создайте последовательность объектов класса case из Spark DataFrame - PullRequest
0 голосов
/ 28 апреля 2020

Как перебрать строки Spark DataFrame и добавить их в последовательность объектов класса case?

DF1:

val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

Класс дела:

case class ValuePerNumber(num:String, wrd:String, defaultID:Int, size: Long=0) {}

Ожидаемый результат:

Seq(ValuePerNumber("202003101750", "202003101700",0, 122), ValuePerNumber("202003101800", "202003101700",0, 12), ValuePerNumber("202003101750", "202003101700",0, 42)) 

В каждом случае я могу использовать defaultID как 0. Я не уверен, как подойти и решить эту проблему и был бы очень признателен за любое решение / предложение!

Я пробовал следующее:

val x = someDF.as[ValuePerNumber].collect()

Я получаю следующую ошибку:

org.apache.spark.sql.AnalysisException: cannot resolve '`num`' given input columns: [number, word, value];

Ответы [ 3 ]

3 голосов
/ 28 апреля 2020
val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

case class ValuePerNumber(number:String, word:String, defaultID:Int, value: Long)

someDF.withColumn("defaultId", lit(0)).as[ValuePerNumber].collect.toSeq
3 голосов
/ 28 апреля 2020

Количество столбцов и имен в DataFrame и Case Class должны совпадать, чтобы использовать as[ValuePerNumber] непосредственно в DataFrame без извлечения значений.

  1. size недоступно в DataFrame, поэтому добавлено с помощью withColumn
  2. Имена столбцов не совпадают ни в классе DF, ни в классе. Изменено в соответствии с DF и классом дела.
scala> :paste
// Entering paste mode (ctrl-D to finish)

val someDF = Seq(("202003101750", "202003101700",122),("202003101800", "202003101700",12),("202003101750", "202003101700",42))
.toDF("number", "word","value")
.withColumn("size",lit(0)) // Added this to match your case class columns


// Exiting paste mode, now interpreting.

someDF: org.apache.spark.sql.DataFrame = [number: string, word: string ... 2 more fields]

scala> case class ValuePerNumber(number:String, word:String, value:Int, size: Long=0) // Modified column names to match your dataframe column names.
defined class ValuePerNumber

scala> someDF.as[ValuePerNumber].show(false)
+------------+------------+-----+----+
|number      |word        |value|size|
+------------+------------+-----+----+
|202003101750|202003101700|122  |0   |
|202003101800|202003101700|12   |0   |
|202003101750|202003101700|42   |0   |
+------------+------------+-----+----+


scala>
2 голосов
/ 28 апреля 2020

Вы можете создать Dataset[ValuePeerNumber] и collect как Seq

val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

val result = someDF.map(r => ValuePerNumber(r.getAs[String](0), r.getAs[String](1), r.getAs[Int](2))).collect().toSeq

Вы также можете добавить столбец в фрейме данных и отредактировать имя столбца в соответствии с классом случая, который вы можете сделать напрямую

val x = someDF.as[ValuePerNumber].collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...