Модульные тесты с использованием Spark Session: SparkContext был закрыт - PullRequest
0 голосов
/ 14 января 2019

У нас большой проект с несколькими наборами тестов, и в каждом наборе в среднем 3 теста.

Для наших модульных тестов мы используем Spark Standalone, поэтому в качестве менеджера ресурсов не используется пряжа. Каждый набор тестов:

Инициализирует сеанс зажигания:

  implicit val spark = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()

расширяется BeforeAndAfterAll:

class MyTestsSpec extends WordSpec
  with Matchers
  with BeforeAndAfterAll {
...
}

и переопределить afterAll:

  override def afterAll: Unit = {
    try {
      spark.stop()
    } finally {
      super.afterAll
    }
  }

Наше решение имеет задание CI в Jenkins, и задание Jenkins стало очень часто Нестабильным из-за неудачных тестов из-за следующей ошибки:

Message d'erreur
Job 9 cancelled because SparkContext was shut down
Pile d'exécution
org.apache.spark.SparkException: Job 9 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)
    at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
    at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
// some business classes
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
    at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)
    at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
    at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
    at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
    at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
    at org.scalatest.Suite$class.run(Suite.scala:1147)
    at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
    at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
    at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
    at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
// some business classes
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
// some business classes
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
    at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
    at sbt.TestRunner.runTest$1(TestFramework.scala:106)
    at sbt.TestRunner.run(TestFramework.scala:117)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:262)
    at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:233)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)
    at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)
    at sbt.TestFunction.apply(TestFramework.scala:271)
    at sbt.Tests$.processRunnable$1(Tests.scala:307)
    at sbt.Tests$.$anonfun$makeSerial$1(Tests.scala:313)
    at sbt.std.Transform$$anon$3.$anonfun$apply$2(System.scala:46)
    at sbt.std.Transform$$anon$4.work(System.scala:66)
    at sbt.Execute.$anonfun$submit$2(Execute.scala:262)
    at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
    at sbt.Execute.work(Execute.scala:271)
    at sbt.Execute.$anonfun$submit$1(Execute.scala:262)
    at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:174)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:36)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

И когда мы запускаем тест в одиночку, он проходит успешно, без проблем.

1 Ответ

0 голосов
/ 02 августа 2019

У меня была похожая проблема.

У меня было несколько тестов с использованием Spark, и работал только первый пакет. Я звонил spark.close() во всех из них. После удаления этого звонка из всех комплектов все заработало.

После исследования кода SparkSession я пришел к выводу, что, поскольку возможно иметь только один SparkContext на JVM, и тесты выполняются на той же JVM, когда вы останавливаете его в первый раз, он становится непригодным для этой «сессии JVM».

...