У меня есть два кода Scala - MyMain.scala и MyFunction.scala, построенные отдельно, а встроенная банка MyFunction будет действовать как UDF в MyMain.
MyFunction.scala в основном содержит класс Java с открытым методом public String myFunc(String val0, String val1)
. Проект построен в SBT, и выходные данные компиляции build_jar сохраняются как артефакт (только требуемый класс, т.е. MyFunction.class, а не зависимости).
MyMain.scala импортирует указанный выше jar артефакта в папку lib в папке и добавляетсяв путь к классам, используя unmanagedBase := baseDirectory.value / "lib"
в build.sbt
Итак, структура проекта MyMain.scala выглядит следующим образом:
MyMain
|
-lib/MyFunction.jar
|
- META-INF/MANIFEST.MF
- MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt
/ Что мне нужно сделать /
Я хочу определить UDF в MyMain.scala на MyFunction.class в MyFunction.jar, который добавляется в путь к классам lib. Я определил UDF, но когда я пытаюсь использовать его в кадре данных Spark внутри MyMain.scala, он выдает «Task not serializable» java.io.NotSerializableException, как показано ниже:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:253)
... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
- object not serializable (class: MyMain$, value: MyMain$@11f25cf)
- field (class: $iw, name: MyMain$module, type: class MyMain$)
- object (class $iw, $iw@540705e8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e6e1038)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7587f2a0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e00f263)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3fbfe419)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5172e87b)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5ec96f75)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@26f6de78)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@18c3bc83)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@35d674ee)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5712092f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6980c2e6)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6ce299e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@406b8acb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@73d71e61)
- field (class: $line47.$read, name: $iw, type: class $iw)
- object (class $line47.$read, $line47.$read@72ee2f87)
- field (class: $iw, name: $line47$read, type: class $line47.$read)
- object (class $iw, $iw@22c4de5a)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@3daea539)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 92 more
/ В чем может быть причина /
MyMain.scala ссылается на некоторый несериализуемый экземпляр класса внутри некоторого преобразования в кадре данных Spark
/ Что я пробовал /
object MyFunction extends Serializable {
val myFuncSingleton = new MyFunction()
def getMyFunc(var0:String,var1:String) : String = {
myFuncSingleton.myFunc(var0,var1)
}
}
import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
object MyMain {
val spark = ...
val hadoopfs = ...
def main(args: Array[String]) : Unit = {
val df1 = ...
val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
}
}
Ссылки по ссылкам ниже как решать-не-сериализуемые ошибки-когда-создаются-объекты-в-искре-udfs