Репликация строк в наборе данных Spark N раз - PullRequest
0 голосов
/ 27 февраля 2019

Когда я пытаюсь сделать что-то подобное в Spark:

val replicas = 10
val dsReplicated = ds flatMap (a => 0 until replicas map ((a, _)))

, я получаю следующее исключение:

java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_1")
- root class: "scala.Tuple2"
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)
  ... 48 elided

Я могу добиться этого, используя Spark DataFrame с функцией explode,Я хотел бы добиться чего-то подобного, используя наборы данных.

Для справки, вот код, который реплицирует строки с использованием API DataFrame:

val dfReplicated = df.
      withColumn("__temporarily__", typedLit((0 until replicas).toArray)).
      withColumn("idx", explode($"__temporarily__")).
      drop($"__temporarily__")

1 Ответ

0 голосов
/ 27 февраля 2019

Вот один из способов сделать это:

case class Zip(zipcode: String)
case class Person(id: Int,name: String,zipcode: List[Zip])

data: org.apache.spark.sql.Dataset[Person]
data.show()

+---+----+--------------+
| id|name|       zipcode|
+---+----+--------------+
|  1| AAA|[[MVP], [RB2]]|
|  2| BBB|[[KFG], [YYU]]|
|  3| CCC|[[JJJ], [7IH]]|
+---+----+--------------+  

data.printSchema

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- zipcode: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- zipcode: string (nullable = true)

val df = data.withColumn("ArrayCol",explode($"zipcode"))
df.select($"id",$"name",$"ArrayCol.zipcode").show()

Вывод:

+---+----+-------+
| id|name|zipcode|
+---+----+-------+
|  1| AAA|    MVP|
|  1| AAA|    RB2|
|  2| BBB|    KFG|
|  2| BBB|    YYU|
|  3| CCC|    JJJ|
|  3| CCC|    7IH|
+---+----+-------+

Использование Dataset:

val resultDS = data.flatMap(x => x.zipcode.map(y => (x.id,x.name,y.zipcode)))
resultDS.show(false)

//resultDS:org.apache.spark.sql.Dataset[(Int, String, String)] = 
//  [_1: integer, _2: string ... 1 more fields] 

//+---+---+---+
//|_1 |_2 |_3 |
//+---+---+---+
//|1  |AAA|MVP|
//|1  |AAA|RB2|
//|2  |BBB|KFG|
//|2  |BBB|YYU|
//|3  |CCC|JJJ|
//|3  |CCC|7IH|
//+---+---+---+ 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...