У меня есть приложение для потокового воспроизведения, которое считывает поток Kafka и вставляет данные в базу данных.
Это фрагмент кода
eventDStream.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// First example that works
val accumulator = streamingContext.sparkContext.longAccumulator
rdd.foreachPartition { records =>
val count = Consumer.process(records)
accumulator.add(count)
}
println(s"accumulated $accumulator.value")
// do the same but aggregate count, does not work
val results = rdd.mapPartitions(records => Consumer.processIterator(records))
val x = results.fold(0)(_ + _)
println(x)
eventDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
В первой части я использовал foreachPartition
с аккумулятором для подсчета количества успешных вставок
Во второй части я вычислил RDD[Int]
, представляющий число успешных вставок в каждом RDD
, и агрегировал результат, используя функцию fold
Но вторая часть всегда печатает 0, а первая часть всегда делает именно то, что я хочу.
Можете ли вы показать мне, почему?
Спасибо