аккумулятор в качестве счетчика в искровом потоке - PullRequest
0 голосов
/ 12 мая 2018

Я использую потоковую передачу искры и хочу создать аккумулятор, который будет подсчитывать количество экземпляров dstream.Я хочу увеличить счетчик внутри метода train, а затем я хочу использовать этот счетчик в другом методе с именем assign.Код, размещенный ниже, не работает.Я не могу найти именованный аккумулятор в интерфейсе искры, и когда я печатаю его значение вне метода, он всегда равен нулю.Если вы думаете, что есть более правильный способ сделать это (я имею в виду не с аккумулятором), пожалуйста, объясните мне, как.

var numInstances: LongAccumulator = null

def init(ssc:StreamingContext) : Unit = {
numInstances = ssc.sparkContext.longAccumulator("numInst")
}

def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
  rdd.foreach(ex => {
    manager = manager.update(ex)
    numInstances.add(1) 
   })
})
}

def assign: Array[Example] = {
 if(numInstances.value <= sizeOption.getValue) {
  //do something
 }
 else {
  //do something else
 }
}

1 Ответ

0 голосов
/ 12 мая 2018

Поскольку функция, переданная в foreachRDD, выполняется в драйвере, вы можете переписать train следующим образом:

var totalCount = 0L
def train(input: DStream[Example]): Unit = {
  input.foreachRDD { rdd =>
   totalCount += rdd.count() 
   rdd.foreach { ex =>
     manager = manager.update(ex)
   }
  }
}

Будьте внимательны со значением totalCount - изменения не распространяютсяSpark и действует только на драйвере, поэтому вы не можете использовать его в коде, выполняемом на исполнителях.

...