Задание ошибки Spark udf не сериализуемо (scala) - PullRequest
0 голосов
/ 07 октября 2019

Я работаю над проектом Scala Spark. Я хотел бы получить формулировку кода с 2 кадрами данных. У меня есть 2 dataFrames:

  • один с кулоном с кодом, который я хотел бы получить формулировку. (Интерес столбца)

dataframe

  • второй с 2 ​​столбцами код и формулировка этого кода.

датафрейм

Я делаю это:

def CodeToInterest(df: sql.DataFrame, codesList: sql.DataFrame) : sql.DataFrame = {
val spark =  SparkSession.builder().getOrCreate()
import spark.implicits._

val transformList = udf((init: Array[String]) => {
  if(init == null) return null
  else init.map((code: String) => {
    if(!code.startsWith("IAB")) code
    else codesList.filter($"Code" === code)
        .first()
        .getAs[String]("Interest")
  })
}).apply(col("interests"))

df.withColumn("newInterests", transformList)

}

Но у меня есть эта ошибка.

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
    at SassWI.RetrieveData$.main(RetrieveData.scala:46)
    at SassWI.RetrieveData.main(RetrieveData.scala) 
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
    - object not serializable (class: java.lang.Object, value: java.lang.Object@3aeb267)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class SassWI.Etl$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic SassWI/Etl$.$anonfun$CodeToInterest$1:(Ljava/lang/Object;Lorg/apache/spark/sql/Dataset;Lorg/apache/spark/sql/SparkSession;[Ljava/lang/String;)[Ljava/lang/String;, instantiatedMethodType=([Ljava/lang/String;)[Ljava/lang/String;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class SassWI.Etl$$$Lambda$2075/1442249061, SassWI.Etl$$$Lambda$2075/1442249061@4b56b517)
    - element of array (index: 5)
    - array (class [Ljava.lang.Object;, size 6)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372@3acc3ee)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 33 more

У вас есть представление о том, что вызвало эту ошибку? Благодарю.

1 Ответ

0 голосов
/ 14 октября 2019

Я наконец-то нашел решение этой проблемы. Согласно документации, массив в кадре данных Spark представлен в виде изменяемого массива. WrappedArray в Scala. Вот почему двигатель не может использовать этот тип!

    val spark =  SparkSession.builder().getOrCreate()
    import spark.implicits._
    val codes: Map[String, String] = codesList.as[(String, String)].collect().toMap

    val transformList = udf((init: mutable.WrappedArray[String]) => {
      if(init == null) init
      else {
        init.map((code: String) => {
          if(!code.startsWith("IAB")) code.toLowerCase()
          else codes(code).toLowerCase()
        })
      }
    }).apply(col("interests"))

    df.withColumn("newInterests", array_distinct(transformList))
  }

Спасибо всем, кто помог!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...