Есть ли способ принудительно обновить искровые аккумуляторы? - PullRequest
0 голосов
/ 02 января 2019

Я хотел выполнить асинхронную работу на рабочих, но аккумуляторы не применяются на стороне драйвера, так как метод updateAccumulators уже был вызван.

Проблема в том, что updateAccumulators является частным, поэтому мой вопрос: есть ли способ заставить его использоваться или другой способ выполнить то, что мне нужно>

   val partiotionsDone =rdd.sparkContext.longAccumulator("partiotionsDone")
   rdd.foreachPartitionAsync { rows =>

   Future {
   val ingestion = ingestRows(rows)
   ingestion onSuccess{
    case _=> {
      partiotionsDone.add(1)
    }
 }
 }

 runTaskEvery(5 seconds, 
        func= ()=> println("partitionsFinished:${partiotionsDone.value}"),
        whenDone=doFinalWork)

runTaskEvery печатает только 0 значений - хотя отладчик показывает, что partiotionsDone.add (1) был выполнен успешно. Конечно, если я жду в будущем, все это работает, но весь смысл здесь не в том, чтобы сделать это

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...