как создать датафрейм в UDF - PullRequest
0 голосов
/ 21 ноября 2019

У меня проблема. Я хочу создать DataFrame в UDF и использовать мою модель для преобразования ее в другую. Но я получаю это исключение. Что-то не так в Spark Conf? Я не знаю. Кто-нибудь может помочь мне решить эту проблему?

Код:

val model = PipelineModel.load("/user/abel/model/pipeline_model")
val modelBroad = spark.sparkContext.broadcast(model)

def model_predict(id:Long, text:String):Double = {
  val modelLoaded = modelBroad.value
  val sparkss = SparkSession.builder.master("local[*]").getOrCreate()
  val dataDF = sparkss.createDataFrame(Seq((id,text))).toDF("id","text")
  val result = modelLoaded.transform(dataDF).select("prediction").collect().apply(0).getDouble(0)
  println(f"The prediction of $id and $text is $result")
  result
}

val udf_func = udf(model_predict _)
test.withColumn("prediction",udf_func($"id",$"text")).show()

Исключение:

Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.SparkPlan.sparkContext(SparkPlan.scala:56)
        at org.apache.spark.sql.execution.LocalTableScanExec.metrics$lzycompute(LocalTableScanExec.scala:37)
        at org.apache.spark.sql.execution.LocalTableScanExec.metrics(LocalTableScanExec.scala:36)
        at org.apache.spark.sql.execution.SparkPlan.resetMetrics(SparkPlan.scala:85)
        at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3366)
        at org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3365)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
        at com.zamplus.mine.SparkSubmit$.com$zamplus$mine$SparkSubmit$$model_predict$1(SparkSubmit.scala:21)
        at com.zamplus.mine.SparkSubmit$$anonfun$1.apply(SparkSubmit.scala:40)
        at com.zamplus.mine.SparkSubmit$$anonfun$1.apply(SparkSubmit.scala:40)
        ... 22 more

...