Scala Тест SparkException: задача не сериализуема - PullRequest
0 голосов
/ 20 марта 2020

Я новичок в Scala и Spark. Написал простой тестовый класс и застрял на этом вопросе на весь день. Пожалуйста, найдите следующий код

A. scala

class A(key :String) extends  Serializable {
     val this.key:String=key

     def getKey(): String = { return this.key}
}

B. Scala

class B(key :String) extends  Serializable {
     val this.key:String=key
     def getKey(): String = { return this.key}
 }

Test. scala

import com.holdenkarau.spark.testing.{RDDComparisons, SharedSparkContext}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter

class Test extends FunSuite with SharedSparkContext with RDDComparisons with BeforeAndAfter with Serializable {

  //comment this
  private[this] val b1 = new B("test1")

  test("Test RDD") {

    val a1 = new A("test1")
    val a2 = new A("test2")

    val expected= sc.parallelize(Seq(a1,a2))
    println(b1.getKey())
     //val b1 = new B("test1")
    //val key1 :String = b1.getKey()
    expected.foreach{ a =>
      //if(a.getKey().equalsIgnoreCase(key1))
        if(a.getKey().equalsIgnoreCase(b1.getKey()))
          print("hi")
    }
  }
}

Этот код вызывает исключение

Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply$mcV$sp(Test.scala:19)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
    at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
    at com.adgear.adata.hhid.Test.org$scalatest$BeforeAndAfterAll$$super$run(Test.scala:7)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
    at com.adgear.adata.hhid.Test.run(Test.scala:7)

Когда я закомментирую объявление уровня класса b1 и использую объявление внутри самих методов теста, тогда "if(a.getKey().equalsIgnoreCase(b1.getKey()))" это работает. И если я сохраню определение уровня класса b1, то «if(a.getKey().equalsIgnoreCase(b1.getKey()))» выдает исключение. Чтобы решить эту проблему, я должен использовать «//val key1 :String = b1.getKey()" and "//if(a.getKey().equalsIgnoreCase(key1))», тогда он работает.

Как видно, A, B и Test все реализуют Serializable, но я получаю это исключение. Что вызывает эту проблему?

Спасибо

1 Ответ

1 голос
/ 20 марта 2020

Объявление класса как Serializable не означает, что он может быть сериализован, если все его поля также не являются Serializable.

Поскольку ваш класс Test расширяет Funsuite, у него будет поле "assertionsHelper", которое не Сериализуемый. Поэтому, когда вы ссылаетесь на поле «b1» в своем методе «forEach», Spark попытается сериализовать экземпляр Test вместе со всем его полем (включая assertionsHelper).

Если вы хотите избежать этого, вы ' Придется либо определить b1 где-нибудь еще (в области действия метода тестирования или сопутствующем объекте), либо разыменовать b1 в новую переменную, прежде чем включать ее в функцию forEach:

val b1_ref = b1
expected.foreach { a =>
  if (a.getKey().equalsIgnoreCase(b1_ref.getKey()))
    print("hi")
}

PS: при обнаружении исключение сериализации: у вас обычно есть доступ к «стеку сериализации» в журналах, которые точно указывают, какой объект вызвал ошибку

...