У меня есть потоковый фрейм данных, созданный с использованием искровой структурированной потоковой передачи. Как это-
val dataStream =
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", topic)
.load()
Теперь, когда я пытаюсь создать набор данных из datastream
с дополнительным столбцом с именем newKey
, выдается следующая ошибка:
[error] (run-main-0) java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
[error] - field (class: "org.apache.spark.sql.Row", name: "_2")
[error] - root class: "scala.Tuple2"
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_2")
- root class: "scala.Tuple2"
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:642)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:636)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:624)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:444)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:820)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:444)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:433)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
Код, который я использую, выглядит следующим образом:
import spark.implicits._
implicit val rowEncoder: ExpressionEncoder[Row] = RowEncoder(dataStream.schema)
val dsStream =
dataStream
.select(lit("a").as("newKey"), col("*"))
.as[(String, Row)]
.writeStream
.format("console")
.start()
Может кто-нибудь помочь мне решить это?