В версии Spark: 2.4.0 я пытаюсь выполнить приведенный ниже код в указанном DataFrame: unfoldedDF: org.apache.spark.sql.DataFrame
movieid: integer
words: array - element: строка
токены: строка
val tokensWithDf = unfoldedDF.groupBy("tokens").agg(countDistinct("movieid") as "df")
tokensWithDf.show()
Новый созданный фрейм данных: tokensWithDf: org.apache.spark.sql.DataFrame
токены: строка
df: long
На этом выполняется следующая операция.
def findIdf(x : Long) : Double = {scala.math.log10((42306).toDouble/x)}
val sqlfunc = udf(findIdf _)
tokensWithDf.withColumn("idf", sqlfunc(col("df"))).show()
Сбой со следующим исключением:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2519)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:866)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:865)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:379)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:865)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:66)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48)