Аккумулятор, я всегда получаю 0 значение - PullRequest
0 голосов
/ 10 мая 2018

Я использую LongAccumulator для подсчета количества записей, которые я сохраняю в Cassandra.

object Main extends App {
  val conf = args(0)
  val ssc = StreamingContext.getStreamingContext(conf)
  Runner.apply(conf).startJob(ssc)

  StreamingContext.startStreamingContext(ssc)
  StreamingContext.stopStreamingContext(ssc)
}
class Runner (conf: Conf) {
    override def startJob(ssc: StreamingContext): Unit = {
        accTotal = ssc.sparkContext.longAccumulator("total")
        val inputKafka = createDirectStream(ssc, kafkaParams, topicsSet)
        val rddAvro = inputKafka.map{x => x.value()}
        saveToCassandra(rddAvro)
        println("XXX:" + accTotal.value) //-->0
    }


    def saveToCassandra(upserts: DStream[Data]) = {
        val rddCassandraUpsert = upserts.map {
            record =>
            accTotal.add(1)
            println("ACC: " + accTotal.value)  --> 1,2,3,4.. OK. Spark Web UI, ok too.
            DataExt(record.data,
                  record.data1)}
        rddCassandraUpsert.saveToCassandra(keyspace, table)
    }
}

Я вижу, что код выполняется правильно, и я сохраняю данные в Cassandra, когда я наконец печатаю аккумулятор, значение равно 0, но если я распечатываю его на карте, я вижу правильные значения. Почему?

Я использую Spark 2.0.2 и выполняю из Intellj в локальном режиме. Я проверил пользовательский веб-интерфейс spark и вижу обновленный накопитель.

1 Ответ

0 голосов
/ 10 мая 2018

Проблема, вероятно, здесь:

object Main extends App {
   ...

Spark не поддерживает приложения, расширяющие App, поэтому могут привести к недетерминированному поведению:

Обратите внимание, что приложения должны определять метод main () вместо расширения scala.App.Подклассы scala.App могут работать неправильно.

Вы всегда должны использовать стандартные приложения с main:

object Main {
    def main(args: Array[String]) {
      ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...