Global Sparksession завершается, перезапускается и заканчивается с ошибкой ... при использовании ForEachPartition в Spark / Scala - PullRequest
0 голосов
/ 25 октября 2019

Я создаю SparkSession в Trait, и объект-компаньон расширяет эту особенность.

SparkSessionObject.scala
--------------------------------------------------------------------------------------------------------
import org.apache.spark.sql.SparkSession

trait SparkSessionObject {

  val brdaSession: SparkSession = SparkSession
    .builder()
    .appName("BRDATest")
    .getOrCreate()

}

object SparkSessionObject extends SparkSessionObject {}
--------------------------------------------------------------------------------------------------------

Всякий раз, когда мне нужен SparkSession или Spark.implicits, я расширяюсь из этого объекта-компаньона.

У меня есть 3 класса, каждый из которых вызывает другой, как показано ниже, все три расширяют объект Companion, указанный выше:

1) Class1 
--------------------------------------------------------------------------------------------------------
import SparkSessionObject.brdaSession.implicits._

object Class1 extends SparkSessionObject{

  def main(args: Array[String]): Unit = {
    .....
    CLASS2.class2method(sampelDF)
    .....
  }
}
--------------------------------------------------------------------------------------------------------

... вызывает Class2 прямо как часть вызова метода

CLASS2:
--------------------------------------------------------------------------------------------------------
import regrpt.utils.{SparkSessionObject}
import SparkSessionObject.brdaSession.implicits._

object CLASS2 extends SparkSessionObject{

  def class2method(df: DataFrame): Unit = {
    .....
    finalAllocOneDF.foreachPartition(partition => {
      if (partition.take(1).nonEmpty) {
        CLASS3.class3method(partition)
      }
    })
    }
}
--------------------------------------------------------------------------------------------------------

SparkSession и Spark отлично работают до сих пор

2) Class 2 
 calls Class 3 as part of a ForEachPartition


CLASS3:
--------------------------------------------------------------------------------------------------------
import SparkSessionObject.brdaSession.implicits._

object CLASS3 extends SparkSessionObject{
  def class3Method(it: Iterator[Row]): Unit = {
    println("CLASS3")
  }
}
--------------------------------------------------------------------------------------------------------

SparkSession and Spark implicits FAIL here, in the sense 
a) The existing SparkSession is stopped, created again and moves to RUNNING state 
b) But then it ends with below error without ever calling CLASS3 method (print statement)

Есть идеи почему?

--------------------------------------------------------------------------------------------------------
Error:
--------------------------------------------------------------------------------------------------------
    19/10/25 12:28:34 ERROR spark.SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:368)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at regrpt.utils.SparkSessionObject$class.$init$(SparkSessionObject.scala:10)
    at regrpt.tests.getAllocOneDF$.<init>(getAllocOneDF.scala:11)
    at regrpt.tests.getAllocOneDF$.<clinit>(getAllocOneDF.scala)
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:342)
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:337)
19/10/25 12:28:34 INFO executor.Executor: Finished task 128.0 in stage 64.0 (TID 8645). 23743 bytes result sent to driver
19/10/25 12:28:34 ERROR util.Utils: Uncaught exception in thread Executor task launch worker for task 8635
java.lang.NullPointerException
    at regrpt.tests.getAllocOneDF$.<init>(getAllocOneDF.scala:11)
    at regrpt.tests.getAllocOneDF$.<clinit>(getAllocOneDF.scala)
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:342)
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:337)
19/10/25 12:28:34 ERROR executor.Executor: Exception in task 66.1 in stage 64.0 (TID 8635)
java.lang.ExceptionInInitializerError
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:342)
    at regrpt.transformations.AllocationOneView$$anonfun$processNettedDFs$1.apply(AllocationOneView.scala:337)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
    at regrpt.tests.getAllocOneDF$.<init>(getAllocOneDF.scala:11)
    at regrpt.tests.getAllocOneDF$.<clinit>(getAllocOneDF.scala)
    ... 14 more
...