Что произойдет, если несколько потоков попробуют getOrCreate с другой конфигурацией? - PullRequest
0 голосов
/ 20 октября 2019

Я пытаюсь понять искровой код, и теперь я ищу SparkSession

Насколько я понимаю, метод getOrCreate в Builder в SparkSession Object, возвращает существующий SparkSession или Createэто впервые, и это потокобезопасно. Таким образом, SparkSession, возвращаемое getOrCreate, равно Singleton

Но я видел код, который изменяет конфигурацию Spark в SparkSession. Кажется, что если несколько потоков пытаются вызвать getOrCreate одновременно, конфигурация других потоков влияет на конфигурацию другого потока.

Например, предположим, что есть threadA и threadB, threadA 'конфигурация s spark.default.parallelism = 10000, а конфигурация threadB для того же - 1.
Код, подобный этому

Thread threadA = new Thread(() -> {
    val spark = SparkSession.builder().appName("appA").config("spark.default.parallelism", "10000").getOrCreate()
    spark.sql("...")
    ...
}).start()

Thread threadB = new Thread(() -> {
    val spark = SparkSession.builder().appName("appB").config("spark.default.parallelism", "1").getOrCreate()
    spark.sql("...")
    ...
}).start()

, и эти потоки в состоянии гонки получают SparkSession,наконец threadA создает SparkSession и затем переключается на threadB. затем, наконец, конфигурация для SparkSession spark.default.parallelism = 1, хотя threadA кодовый блок ожидает, что это 10000.

Правильно ли я понял?

К вашему сведению, искровой код здесь.

private val activeThreadSession = new InheritableThreadLocal[SparkSession]
private val defaultSession = new AtomicReference[SparkSession]
...

def getOrCreate(): SparkSession = synchronized { //this block executed first by threadA and then threadB
  ...
  var session = activeThreadSession.get() //threadA, threadB tries to get session but not exists
  if ((session ne null) && !session.sparkContext.isStopped) {
    options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
    if (options.nonEmpty) {
      logWarning("Using an existing SparkSession; some configuration may not take effect.")
    }
    return session
  }

  SparkSession.synchronized {
    session = defaultSession.get() //threadA tries to get it but not exists / threadB get this
    if ((session ne null) && !session.sparkContext.isStopped) {
      options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
      if (options.nonEmpty) {
        logWarning("Using an existing SparkSession; some configuration may not take effect.")
      }
      return session
    }
    //threadA execute this logic / threadB will not be reached here
    val sparkContext = userSuppliedContext.getOrElse {
      ...
    }
    ...
    session = new SparkSession(sparkContext, None, None, extensions)
    options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } //this is threadA's configuration
    setDefaultSession(session) //set with threadA's configuration
    setActiveSession(session) //set with threadA's configuration
...