Я новичок в Scala и Spark. Написал простой тестовый класс и застрял на этом вопросе на весь день. Пожалуйста, найдите следующий код
A. scala
class A(key :String) extends Serializable {
val this.key:String=key
def getKey(): String = { return this.key}
}
B. Scala
class B(key :String) extends Serializable {
val this.key:String=key
def getKey(): String = { return this.key}
}
Test. scala
import com.holdenkarau.spark.testing.{RDDComparisons, SharedSparkContext}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
class Test extends FunSuite with SharedSparkContext with RDDComparisons with BeforeAndAfter with Serializable {
//comment this
private[this] val b1 = new B("test1")
test("Test RDD") {
val a1 = new A("test1")
val a2 = new A("test2")
val expected= sc.parallelize(Seq(a1,a2))
println(b1.getKey())
//val b1 = new B("test1")
//val key1 :String = b1.getKey()
expected.foreach{ a =>
//if(a.getKey().equalsIgnoreCase(key1))
if(a.getKey().equalsIgnoreCase(b1.getKey()))
print("hi")
}
}
}
Этот код вызывает исключение
Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at com.adgear.adata.hhid.Test$$anonfun$1.apply$mcV$sp(Test.scala:19)
at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at com.adgear.adata.hhid.Test.org$scalatest$BeforeAndAfterAll$$super$run(Test.scala:7)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.adgear.adata.hhid.Test.run(Test.scala:7)
Когда я закомментирую объявление уровня класса b1 и использую объявление внутри самих методов теста, тогда "if(a.getKey().equalsIgnoreCase(b1.getKey()))
" это работает. И если я сохраню определение уровня класса b1, то «if(a.getKey().equalsIgnoreCase(b1.getKey()))
» выдает исключение. Чтобы решить эту проблему, я должен использовать «//val key1 :String = b1.getKey()" and "//if(a.getKey().equalsIgnoreCase(key1))
», тогда он работает.
Как видно, A, B и Test все реализуют Serializable, но я получаю это исключение. Что вызывает эту проблему?
Спасибо