Ошибка сериализации задачи при использовании UDF в Spark - PullRequest
0 голосов
/ 08 сентября 2018

Когда я создаю функцию UDF, как показано выше, я получаю ошибку сериализации задачи. Эта ошибка появляется только тогда, когда я запускаю код в режиме развертывания кластера, используя spark-submit. Тем не менее, он хорошо работает в спарк-оболочке.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

def mfnURL(arr: WrappedArray[String]): String = {
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}

val mfnURLUDF = udf(mfnURL _)

def windowSpec = Window.partitionBy("nodeId", "url", "typology")                                                     
val result = df.withColumn("count", count("url").over(windowSpec))
  .orderBy($"count".desc)                                                                                            
  .groupBy("nodeId","typology")                                                                                      
  .agg(
  first("url"),
  mfnURLUDF(collect_list("source_url")),
  min("minTimestamp"),
  max("maxTimestamp")
)

Я пытался добавить spark.udf.register("mfnURLUDF",mfnURLUDF), но это не решило проблему.

1 Ответ

0 голосов
/ 08 сентября 2018

Вы также можете попробовать создать udf следующим образом:

val mfnURL = udf { arr: WrappedArray[String] =>
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}
...