Дочерний поток не видит обновления, сделанные основным потоком - PullRequest
0 голосов
/ 14 сентября 2018

Я реализую SparkHealthListener, расширяя класс SparkListener.

@Component
class ClusterHealthListener extends SparkListener with Logging {
  val appRunning = new AtomicBoolean(false)
  val executorCount = new AtomicInteger(0)

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart) = {
    logger.info("Application Start called .. ")
    this.appRunning.set(true)
    logger.info(s"[appRunning = ${appRunning.get}]")
  }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = {
    logger.info("Executor add called .. ")
    this.executorCount.incrementAndGet()
    logger.info(s"[executorCount = ${executorCount.get}]")
  }
}

appRunning и executorCount - две переменные, объявленные в классе ClusterHealthListener.ClusterHealthReporterThread будет только читать значения.

@Component
class ClusterHealthReporterThread @Autowired() (healthListener: ClusterHealthListener) extends Logging {
  new Thread {
    override def run(): Unit = {
      while (true) {
          Thread.sleep(10 * 1000)
          logger.info("Checking range health")
          logger.info(s"[appRunning = ${healthListener.appRunning.get}] [executorCount=${healthListener.executorCount.get}]"
      }
    }
  }.start()
}

ClusterHealthReporterThread всегда сообщает об инициализированных значениях независимо от изменений, внесенных в переменную основным потоком?Что я делаю неправильно?Это потому, что я добавляю healthListener в ClusterHealthReporterThread?

Обновление

Я немного поиграл и похоже, что это как-то связано с тем, как я запускаю искровой прослушиватель.

ЕслиЯ добавляю прослушиватель spark следующим образом:

val sparkContext = SparkContext.getOrCreate(sparkConf) sparkContext.addSparkListener(healthListener)

Родительский поток всегда будет показывать appRunning как 'false', но правильно отображает количество исполнителей.Дочерний поток (репортер работоспособности) также будет показывать правильное количество исполнителей, но appRunning всегда сообщал о «ложном», как в основном потоке.

Тогда я наткнулся на это Почему SparkListenerApplicationStart никогда не запускается? и попытался установить слушатель на уровне конфигурации spark,

.set("spark.extraListeners", "HealthListener class path")

Если ясделать это, основной поток сообщит «true» для appRunning и сообщит правильное число исполнителей , но дочерний поток всегда сообщит «false» и значение «0» для исполнителей.

1 Ответ

0 голосов
/ 24 сентября 2018

Я не могу сразу увидеть, что здесь не так, возможно, вы нашли интересный крайний случай.

Я думаю, что комментарий @ m4gic может быть правильным, что библиотека журналов, возможно, кэширует эту интерполированную строку?Похоже, вы используете https://github.com/lightbend/scala-logging, который утверждает, что эта интерполяция "не влияет на поведение", так что, возможно, нет.Не могли бы вы последовать его предложению повторить попытку, не используя эту функцию, и сообщить об этом?

Вторая возможность: интересно, есть ли в системе только один ClusterHealthListener?Возможно, автоматическое подключение вызывает создание второго экземпляра?Можете ли вы зарегистрировать идентификаторы объектов из ссылки ClusterHealthListener в обоих местоположениях и убедиться, что они являются одним и тем же объектом?

Если ни одно из этих предложений не исправляет это, вы можете опубликоватьрабочий пример, с которым я могу поиграть?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...