Я пытаюсь зарегистрировать Spark UDF, чтобы помочь мне преобразовать строку XML из таблицы, но получаю следующее исключение. Могу ли я знать, что мне не хватает? Я использую Scala 2.12.10 и Spark 2.4.4.
package org.mt.experiments
import org.apache.spark.sql.SparkSession
import scala.xml.transform.{RewriteRule, RuleTransformer}
import scala.xml.{Elem, Node, NodeSeq}
object Launcher2 {
case class Student(name: String, books: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.enableHiveSupport
.getOrCreate
import spark.implicits._
spark.udf.register("cleanXML", (xmlDoc: Node) => {
new RuleTransformer(new RewriteRule {
override def transform(node: Node): NodeSeq = node match {
case e: Elem if e.label == "author" => NodeSeq.Empty
case node => node
}
}).transform(xmlDoc)
})
val andy = Student(
name = "Andy",
books = "<books><book><<title>Functional Programming in Scala</title><author>Paul Chiusano and Runar Bjarnason</author><year>2014-12-26</year></book><book><title>Real and Complex Analysis</title><author>Walter Rudin</author><year>2015-05-19</year></book></books>"
)
val studentDF = Seq(andy).toDF()
studentDF.createOrReplaceTempView("studentDetails")
val tokDF = spark.sql("SELECT name, cleanXML(books) as books FROM studentDetails")
tokDF.show(false)
}
}
ОШИБКА
Exception in thread "main" scala.MatchError: scala.xml.Node (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:760)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:926)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:925)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:740)
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:761)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:926)
at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:925)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:740)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:736)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:192)
at org.mt.experiments.Launcher2$.main(Launcher2.scala:22)
at org.mt.experiments.Launcher2.main(Launcher2.scala)