Я пытаюсь понять искровой код, и теперь я ищу SparkSession
Насколько я понимаю, метод getOrCreate в Builder в SparkSession
Object, возвращает существующий SparkSession
или Createэто впервые, и это потокобезопасно. Таким образом, SparkSession
, возвращаемое getOrCreate
, равно Singleton
Но я видел код, который изменяет конфигурацию Spark в SparkSession. Кажется, что если несколько потоков пытаются вызвать getOrCreate одновременно, конфигурация других потоков влияет на конфигурацию другого потока.
Например, предположим, что есть threadA
и threadB
, threadA
'конфигурация s spark.default.parallelism
= 10000, а конфигурация threadB
для того же - 1.
Код, подобный этому
Thread threadA = new Thread(() -> {
val spark = SparkSession.builder().appName("appA").config("spark.default.parallelism", "10000").getOrCreate()
spark.sql("...")
...
}).start()
Thread threadB = new Thread(() -> {
val spark = SparkSession.builder().appName("appB").config("spark.default.parallelism", "1").getOrCreate()
spark.sql("...")
...
}).start()
, и эти потоки в состоянии гонки получают SparkSession
,наконец threadA
создает SparkSession и затем переключается на threadB
. затем, наконец, конфигурация для SparkSession spark.default.parallelism
= 1, хотя threadA
кодовый блок ожидает, что это 10000.
Правильно ли я понял?
К вашему сведению, искровой код здесь.
private val activeThreadSession = new InheritableThreadLocal[SparkSession]
private val defaultSession = new AtomicReference[SparkSession]
...
def getOrCreate(): SparkSession = synchronized { //this block executed first by threadA and then threadB
...
var session = activeThreadSession.get() //threadA, threadB tries to get session but not exists
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
SparkSession.synchronized {
session = defaultSession.get() //threadA tries to get it but not exists / threadB get this
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
//threadA execute this logic / threadB will not be reached here
val sparkContext = userSuppliedContext.getOrElse {
...
}
...
session = new SparkSession(sparkContext, None, None, extensions)
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } //this is threadA's configuration
setDefaultSession(session) //set with threadA's configuration
setActiveSession(session) //set with threadA's configuration