Я использую LongAccumulator для подсчета количества записей, которые я сохраняю в Cassandra.
object Main extends App {
val conf = args(0)
val ssc = StreamingContext.getStreamingContext(conf)
Runner.apply(conf).startJob(ssc)
StreamingContext.startStreamingContext(ssc)
StreamingContext.stopStreamingContext(ssc)
}
class Runner (conf: Conf) {
override def startJob(ssc: StreamingContext): Unit = {
accTotal = ssc.sparkContext.longAccumulator("total")
val inputKafka = createDirectStream(ssc, kafkaParams, topicsSet)
val rddAvro = inputKafka.map{x => x.value()}
saveToCassandra(rddAvro)
println("XXX:" + accTotal.value) //-->0
}
def saveToCassandra(upserts: DStream[Data]) = {
val rddCassandraUpsert = upserts.map {
record =>
accTotal.add(1)
println("ACC: " + accTotal.value) --> 1,2,3,4.. OK. Spark Web UI, ok too.
DataExt(record.data,
record.data1)}
rddCassandraUpsert.saveToCassandra(keyspace, table)
}
}
Я вижу, что код выполняется правильно, и я сохраняю данные в Cassandra, когда я наконец печатаю аккумулятор, значение равно 0, но если я распечатываю его на карте, я вижу правильные значения. Почему?
Я использую Spark 2.0.2 и выполняю из Intellj в локальном режиме. Я проверил пользовательский веб-интерфейс spark и вижу обновленный накопитель.