org.apache.spark.SparkException: задача не сериализуема. Причина: java.io.NotSerializableException - PullRequest
1 голос
/ 27 октября 2019

У меня есть два кода Scala - MyMain.scala и MyFunction.scala, построенные отдельно, а встроенная банка MyFunction будет действовать как UDF в MyMain.

MyFunction.scala в основном содержит класс Java с открытым методом public String myFunc(String val0, String val1). Проект построен в SBT, и выходные данные компиляции build_jar сохраняются как артефакт (только требуемый класс, т.е. MyFunction.class, а не зависимости).

MyMain.scala импортирует указанный выше jar артефакта в папку lib в папке и добавляетсяв путь к классам, используя unmanagedBase := baseDirectory.value / "lib" в build.sbt

Итак, структура проекта MyMain.scala выглядит следующим образом:

MyMain
| 
-lib/MyFunction.jar
       |
       - META-INF/MANIFEST.MF
       - MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt

/ Что мне нужно сделать /

Я хочу определить UDF в MyMain.scala на MyFunction.class в MyFunction.jar, который добавляется в путь к классам lib. Я определил UDF, но когда я пытаюсь использовать его в кадре данных Spark внутри MyMain.scala, он выдает «Task not serializable» java.io.NotSerializableException, как показано ниже:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:253)
  ... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
    - object not serializable (class: MyMain$, value: MyMain$@11f25cf)
    - field (class: $iw, name: MyMain$module, type: class MyMain$)
    - object (class $iw, $iw@540705e8)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e6e1038)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7587f2a0)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e00f263)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3fbfe419)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5172e87b)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5ec96f75)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@26f6de78)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@18c3bc83)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@35d674ee)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5712092f)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6980c2e6)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6ce299e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@406b8acb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@73d71e61)
    - field (class: $line47.$read, name: $iw, type: class $iw)
    - object (class $line47.$read, $line47.$read@72ee2f87)
    - field (class: $iw, name: $line47$read, type: class $line47.$read)
    - object (class $iw, $iw@22c4de5a)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@3daea539)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 92 more

/ В чем может быть причина /

MyMain.scala ссылается на некоторый несериализуемый экземпляр класса внутри некоторого преобразования в кадре данных Spark

/ Что я пробовал /

object MyFunction extends Serializable {
  val myFuncSingleton = new MyFunction()
  def getMyFunc(var0:String,var1:String) : String = {
    myFuncSingleton.myFunc(var0,var1)
  }
}

import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })

object MyMain {
  val spark = ...
  val hadoopfs = ...
  def main(args: Array[String]) : Unit = {
    val df1 = ...
    val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
  }
}

Ссылки по ссылкам ниже как решать-не-сериализуемые ошибки-когда-создаются-объекты-в-искре-udfs

1 Ответ

0 голосов
/ 15 ноября 2019

Незначительно подправил код, и это решило мою проблему.

Хотя я не полностью понимаю внутреннюю работу компилятора Scala и то, как он обрабатывает UDF, я постараюсь объяснить свое решение и то, что могло быть причиной Task not serializable ошибки:

  1. Использование переменной myUDF в withColumn(...) не входит ни в какое закрытие RDD.
  2. Внутри определения udf(...) вне программы драйвера вызов getMyFunc(...) для объекта Scala MyFunction эквивалентен вызову статического метода и, следовательно, объект MyFunction не нуждается в сериализации, так как он используется какобъект singleton, а не как экземпляр класса MyFunction (определенный внутри MyFunction.jar). Это объясняет изменение определения MyFunction с object MyFunction extends Serializable на object MyFunction.
  3. Однако внутри одноэлементного объекта MyFunction «оболочки» myFuncSingleton определяется как экземпляр класса MyFunction (в jar), а myFuncSingleton.myFunc(...) вызывает метод myFunc(...) этого экземпляра.
  4. Однако экземпляр myFuncSingleton и его класс MyFunction, на которые ссылается программа драйвера через myUDF, находятся за пределами закрытия RDD (как упомянуто в 1.), и, следовательно, класс MyFunction должен быть явно сериализован, то есть public class MyFunction implements java.io.Serializable (поскольку встроенный в jar Java-класс)
  5. Как упоминалось в 1., поскольку вызов UDF в withColumn(...) не находится в закрытии RDD, объект MyMain необходимо сериализовать, чтобы сделать UDF доступныммежду разделами, то есть object MyMain extends Serializable

    object MyFunction {
      val myFuncSingleton = new MyFunction()
      def getMyFunc(var0:String,var1:String) : String = {
        myFuncSingleton.myFunc(var0,var1)
      }
    }
    
    import org.apache.spark.sql.functions.udf
    val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
    
    object MyMain extends Serializable {
      val spark = ...
      val hadoopfs = ...
      def main(args: Array[String]) : Unit = {
        val df1 = ...
        val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
      }
    }
    

Примечание:

  • Подводя итог, я вызываю метод экземпляра MyFunction через статический вызов метода MyFunction singletonобъект. Следовательно, val myFuncVar = new MyFunction() должно быть более подходящим, чем val myFuncSingleton = new MyFunction().
  • Я не до конца понимаю нюансы закрытия RDD и не уверен, находится ли withColumn () вне закрытия RDD, но предполагается, что радиобъяснение.

Получил очень хорошее объяснение здесь: Как Spark обрабатывает объект

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...