Я использую потоковую передачу искры и хочу создать аккумулятор, который будет подсчитывать количество экземпляров dstream.Я хочу увеличить счетчик внутри метода train, а затем я хочу использовать этот счетчик в другом методе с именем assign.Код, размещенный ниже, не работает.Я не могу найти именованный аккумулятор в интерфейсе искры, и когда я печатаю его значение вне метода, он всегда равен нулю.Если вы думаете, что есть более правильный способ сделать это (я имею в виду не с аккумулятором), пожалуйста, объясните мне, как.
var numInstances: LongAccumulator = null
def init(ssc:StreamingContext) : Unit = {
numInstances = ssc.sparkContext.longAccumulator("numInst")
}
def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
rdd.foreach(ex => {
manager = manager.update(ex)
numInstances.add(1)
})
})
}
def assign: Array[Example] = {
if(numInstances.value <= sizeOption.getValue) {
//do something
}
else {
//do something else
}
}