Как найти сумму в каждом разделе в Spark - PullRequest
2 голосов
/ 19 июня 2019

Я создал класс и использовал этот класс для создания СДР. Я хочу вычислить сумму LoudnessRate (член класса) для каждого раздела. Эта сумма будет позже использована для вычисления Mean LoudnessRate в каждом разделе. Я пробовал следующий код, но он не вычисляет сумму и возвращает 0.0 . Мой код

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println
      }
    }


    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 
    }

Ответы [ 2 ]

4 голосов
/ 19 июня 2019

Изменение моего комментария на ответ по запросу. Оригинальный комментарий

Вы изменяете arrSum в JVM исполнителя и печатаете его значения в JVM dirver. Вы можете сопоставить итераторы с одноэлементными итераторами и использовать команду collect для перемещения значений в драйвер. Кроме того, не используйте iterator.map для побочных эффектов, для этого предназначен iterator.foreach.

А вот пример кода, как это сделать. Сначала создайте СДР с двумя разделами, 0 -> 1,2,3 и 1 -> 4,5. Естественно, вам не понадобится это в реальном коде, но, поскольку поведение sc.parallelize меняется в зависимости от среды, это всегда будет создавать единые СДР для воспроизведения:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
  }
}
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
  .partitionBy(DemoPartitioner)
  .map(_._2)

А потом действительный трюк:

val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)

Выходы:

Map(0 -> 6, 1 -> 9)
0 голосов
/ 19 июня 2019

Проблема в том, что вы используете arrSum (обычную коллекцию), которая объявлена ​​в вашем драйвере и обновлена ​​в Executors.Всякий раз, когда вы делаете это, вам нужно использовать аккумуляторы.

Это должно помочь

...