forEachPartition работает, но не mapPartition - PullRequest
0 голосов
/ 10 апреля 2019

У меня есть приложение для потокового воспроизведения, которое считывает поток Kafka и вставляет данные в базу данных.

Это фрагмент кода

eventDStream.foreachRDD { (rdd, time) =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // First example that works
      val accumulator = streamingContext.sparkContext.longAccumulator
      rdd.foreachPartition { records =>
        val count = Consumer.process(records)
        accumulator.add(count)
      }
      println(s"accumulated $accumulator.value")
      // do the same but aggregate count, does not work
      val results = rdd.mapPartitions(records => Consumer.processIterator(records))
      val x = results.fold(0)(_ + _)
      println(x)
      eventDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

В первой части я использовал foreachPartition с аккумулятором для подсчета количества успешных вставок

Во второй части я вычислил RDD[Int], представляющий число успешных вставок в каждом RDD, и агрегировал результат, используя функцию fold

Но вторая часть всегда печатает 0, а первая часть всегда делает именно то, что я хочу.

Можете ли вы показать мне, почему?

Спасибо

...