Я реализую SparkHealthListener, расширяя класс SparkListener.
@Component
class ClusterHealthListener extends SparkListener with Logging {
val appRunning = new AtomicBoolean(false)
val executorCount = new AtomicInteger(0)
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) = {
logger.info("Application Start called .. ")
this.appRunning.set(true)
logger.info(s"[appRunning = ${appRunning.get}]")
}
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = {
logger.info("Executor add called .. ")
this.executorCount.incrementAndGet()
logger.info(s"[executorCount = ${executorCount.get}]")
}
}
appRunning и executorCount - две переменные, объявленные в классе ClusterHealthListener.ClusterHealthReporterThread будет только читать значения.
@Component
class ClusterHealthReporterThread @Autowired() (healthListener: ClusterHealthListener) extends Logging {
new Thread {
override def run(): Unit = {
while (true) {
Thread.sleep(10 * 1000)
logger.info("Checking range health")
logger.info(s"[appRunning = ${healthListener.appRunning.get}] [executorCount=${healthListener.executorCount.get}]"
}
}
}.start()
}
ClusterHealthReporterThread всегда сообщает об инициализированных значениях независимо от изменений, внесенных в переменную основным потоком?Что я делаю неправильно?Это потому, что я добавляю healthListener в ClusterHealthReporterThread?
Обновление
Я немного поиграл и похоже, что это как-то связано с тем, как я запускаю искровой прослушиватель.
ЕслиЯ добавляю прослушиватель spark следующим образом:
val sparkContext = SparkContext.getOrCreate(sparkConf)
sparkContext.addSparkListener(healthListener)
Родительский поток всегда будет показывать appRunning как 'false', но правильно отображает количество исполнителей.Дочерний поток (репортер работоспособности) также будет показывать правильное количество исполнителей, но appRunning всегда сообщал о «ложном», как в основном потоке.
Тогда я наткнулся на это Почему SparkListenerApplicationStart никогда не запускается? и попытался установить слушатель на уровне конфигурации spark,
.set("spark.extraListeners", "HealthListener class path")
Если ясделать это, основной поток сообщит «true» для appRunning и сообщит правильное число исполнителей , но дочерний поток всегда сообщит «false» и значение «0» для исполнителей.