Spark RDD: AggregateByKey создает задачу, не сериализуемую, и я не вижу несериализуемые объекты - PullRequest
0 голосов
/ 04 мая 2020

У меня есть такой код:

object Helpers {
   val getPercentVariationInterval = ( 
                                       prevStartClose: Double,
                                       prevEndClose: Double,
                                       prevStartDate: Date,
                                       prevEndDate: Date,
                                       newClose: Double,
                                       newDate: Date
                                      ) => 
                                      { 
                                       return something as 
                                       (Double, Double, Date, Date)
                                      }

   val seqOp = (
                 acc: (Double, Double, Date, Date, Double, Double, Long, Int), values: (Double, Double, Double, Double, Long, Int, Date)
               ) => 
               { 
                  getPercentVariationInterval()
                  return something as (Double, Double, Date, Date, Double, Double, Long, Int)
               }

   val compOP = (
                 acc1: (Double, Double, Date, Date, Double, Double, Long, Int), acc2: (Double, Double, Date, Date, Double, Double, Long, Int)
                ) => 
                {
                  getPercentVariationInterval()
                  return something as (Double, Double, Date, Date, Double, Double, Long, Int)
                }
}

   object JobOne extends Serializable {
      val run = () => {
      val rdd = ...
      val zeroVal = some value
//the RDD looks like RDD[(String, (Double, Double, Double, Double, Long, Int, Date))] but warns me about an implicit conversion, I don't know if that's relevant
      val result = rdd.aggregateByKey(zeroVal)(seqOP,compOP)
      }


 }
    object App{
   def  main(args: Array[String]) {
      JobOne.run()
   }
}

Ошибка такая:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$1(PairRDDFunctions.scala:86)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:75)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$aggregateByKey$1(PairRDDFunctions.scala:168)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:157)
    at org.apache.spark.rdd.PairRDDFunctions.$anonfun$aggregateByKey$5(PairRDDFunctions.scala:197)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:197)
    at big_data.job_one.JobOne$.$anonfun$run$1(App.scala:101)
    at big_data.job_one.App$.main(App.scala:116)
    at big_data.job_one.App.main(App.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$2:([BLscala/reflect/ClassTag;Lscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$832/956429999, org.apache.spark.rdd.PairRDDFunctions$$Lambda$832/956429999@150ede8b)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$aggregateByKey$3:(Lscala/Function2;Lscala/Function0;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$833/925152318, org.apache.spark.rdd.PairRDDFunctions$$Lambda$833/925152318@4b3fe06e)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
    ... 21 more

Я не понимаю, какая часть этого не сериализуется, пытается запустить seqOP и compOP внутри JobOne, просто вызывая их, работает (например, println (seqOP (что-то, что-то))), проблема возникает, если я передаю функцию в aggregateByKey. Я уже прочитал несколько ответов, но, похоже, ничего не помогло, ни расширение Serializable, ни превращение def в функции.

Я попытался поместить три функции в объект самостоятельно, я попытался просто дать им пощечину как анонимные функции внутри aggregateByKey, я попытался изменить аргументы и тип возвращаемого значения на что-то более простое. Ничего не работает.

Это весь код, если нужно, пожалуйста, это сводит меня с ума: https://scalafiddle.io/sf/kh5zcN4/0

Редактировать: извините, я удалил оригинальный вопрос по ошибке, сейчас 3 часа ночи, и я пытаюсь понять это часами.

Пример данных по запросу: https://pastebin.com/NDYFX8pJ

1 Ответ

0 голосов
/ 04 мая 2020

Я не уверен, что именно было не так, но это как-то вина Эклипса. Использование IntelliJ не дало мне таких проблем.

...