Когда я создаю функцию UDF, как показано выше, я получаю ошибку сериализации задачи. Эта ошибка появляется только тогда, когда я запускаю код в режиме развертывания кластера, используя spark-submit
. Тем не менее, он хорошо работает в спарк-оболочке.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
def mfnURL(arr: WrappedArray[String]): String = {
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
val mfnURLUDF = udf(mfnURL _)
def windowSpec = Window.partitionBy("nodeId", "url", "typology")
val result = df.withColumn("count", count("url").over(windowSpec))
.orderBy($"count".desc)
.groupBy("nodeId","typology")
.agg(
first("url"),
mfnURLUDF(collect_list("source_url")),
min("minTimestamp"),
max("maxTimestamp")
)
Я пытался добавить spark.udf.register("mfnURLUDF",mfnURLUDF)
, но это не решило проблему.