Как найти сумму значений для каждого в соответствии с позицией индекса в Spark / Scala - PullRequest
0 голосов
/ 07 мая 2020
 scala>  val dataArray = Array("a,1|2|3","b,4|5|6","a,7|8|9","b,10|11|12")
 dataArray: Array[String] = Array(a,1|2|3, b,4|5|6, a,7|8|9, b,10|11|12)

 scala>  val dataRDD = sc.parallelize(dataArray)
 dataRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:26

 scala>  val mapRDD = dataRDD.map(rec => (rec.split(",")(0),rec.split(",")(1).split("\\|")))
 mapRDD: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[9] at map at 
 <console>:25

 scala> mapRDD.collect
 res20: Array[(String, Array[String])] = Array((a,Array(1, 2, 3)), (b,Array(4, 5, 6)), (a,Array(7, 8, 9)), (b,Array(10, 11, 12)))

 scala> mapRDD.reduceByKey((value1,value2) => List(value1(0) + value2(0)))
 <console>:26: error: type mismatch;
  found   : List[String]
  required: Array[String]
       mapRDD.reduceByKey((value1,value2) => List(value1(0) + value2(0)))

Я пробовал это дальше

 val finalRDD = mapRDD.map(elem => (elem._1,elem._2.mkString("#")))

scala> finalRDD.reduceByKey((a,b) => (a.split("#")(0).toInt + b.split("#")(0).toInt).toString )
 res31: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[14] at reduceByKey at <console>:26

 scala> res31.collect
res32: Array[(String, String)] = Array((b,14), (a,8))

Как видите, я не могу получить результат для всех индексов, мой код дает сумму только для одного индекса.

Мой ожидаемый результат ниже

Я хочу, чтобы сумма применялась на основе индекса, такая как сумма всех [0] и сумма всех [1]

(a,(8,10,12))
(b,(14,16,18))

пожалуйста, помогите

Ответы [ 2 ]

0 голосов
/ 07 мая 2020

Мне кажется, что ответ QuickSilver идеален

Я пробовал этот подход с помощью reduceByKey, но я вручную добавляю для каждого индекса, если индексов больше, то это не поможет код такой же, как и в моем вопросе

.....
.....
mapRDD = code as per Question 

scala>  val finalRDD =  mapRDD.map(elem => (elem._1, elem._2 match {
 |  case Array(a:String,b:String,c:String) => (a.toInt,b.toInt,c.toInt)
 | case _ => (100,200,300)
 | }
 | )
 | )

scala> finalRDD.collect
res39: Array[(String, (Int, Int, Int))] = Array((a,(1,2,3)), (b,(4,5,6)), (a,(7,8,9)), (b,(10,11,12)))


scala> finalRDD.reduceByKey((v1,v2) => (v1._1+v2._1,v1._2+v2._2,v1._3+v2._3))
res40: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ShuffledRDD[18] at reduceByKey at <console>:26

scala> res40.collect
res41: Array[(String, (Int, Int, Int))] = Array((b,(14,16,18)), (a,(8,10,12)))
0 голосов
/ 07 мая 2020

Используйте transpose и map, чтобы получить _.sum

import org.apache.spark.rdd.RDD

object RddExample {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess
    val sc = spark.sparkContext
//    import spark.implicits._
    val dataArray = Array("a,1|2|3","b,4|5|6","a,7|8|9","b,10|11|12")

    val dataRDD = sc.parallelize(dataArray)

    val mapRDD : RDD[(String, Array[Int])] = dataRDD.map(rec => (rec.split(",")(0),rec.split(",")(1)
      .split("\\|").map(_.toInt)))
//    val mapRdd : Array[(String, Array[String])] = mapRDD.collect

    val result : Array[(String, List[Int])] =  mapRDD.groupByKey().mapValues(itr => {
      itr.toList.transpose.map(_.sum)
    }).collect()
    println(result)

  }

}
...