Я хотел выполнить асинхронную работу на рабочих, но аккумуляторы не применяются на стороне драйвера, так как метод updateAccumulators уже был вызван.
Проблема в том, что updateAccumulators является частным, поэтому мой вопрос: есть ли способ заставить его использоваться или другой способ выполнить то, что мне нужно>
val partiotionsDone =rdd.sparkContext.longAccumulator("partiotionsDone")
rdd.foreachPartitionAsync { rows =>
Future {
val ingestion = ingestRows(rows)
ingestion onSuccess{
case _=> {
partiotionsDone.add(1)
}
}
}
runTaskEvery(5 seconds,
func= ()=> println("partitionsFinished:${partiotionsDone.value}"),
whenDone=doFinalWork)
runTaskEvery печатает только 0 значений - хотя отладчик показывает, что partiotionsDone.add (1) был выполнен успешно. Конечно, если я жду в будущем, все это работает, но весь смысл здесь не в том, чтобы сделать это