Этого можно достичь, используя искровые RDD, например:
Я заново создал ваши данные для создания RDD:
val data_test =
List(Result("Paris", List( new NameValuePair("apples",10),new NameValuePair("oranges",20), new NameValuePair("peaches",30))),
Result("Paris", List( new NameValuePair("apples",20), new NameValuePair("oranges",30),new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",20),new NameValuePair("oranges",30), new NameValuePair("peaches",40))),
Result("NY", List(new NameValuePair("apples",40), new NameValuePair("oranges",30), new NameValuePair("peaches",10))),
Result("London", List(new NameValuePair("apples",20),new NameValuePair("oranges",30),new NameValuePair("peaches",40))) )
Затем я создал RDD из data_test и применил к нему преобразования. , вот код:
val rdd_data = sc.parallelize(data_test)
val rdd_1 = rdd_data.map(x => ((x.key,x.pairs(0).name),x.pairs(0).value))
val rdd_2 = rdd_data.map(x => ((x.key,x.pairs(1).name),x.pairs(1).value))
val rdd_3 = rdd_data.map(x => ((x.key,x.pairs(2).name),x.pairs(2).value))
val rdd_final = rdd_1.union(rdd_2).union(rdd_3)
val rdd_reduce = rdd_final.reduceByKey((x,y) => x+y)
val rdd_transformed = rdd_reduce.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey().map(x=>(x._1,x._2.toList))
rdd_transformed.foreach(println)
Полученный результат выглядит так:
(NY,List((peaches,50), (apples,60), (oranges,60)))
(London,List((apples,20), (peaches,40), (oranges,30)))
(Paris,List((oranges,50), (peaches,70), (apples,30)))
[РЕДАКТИРОВАТЬ ПОСЛЕ КОММЕНТАРИИ] Если количество пар меняется, вы можете определить функцию следующим образом:
def func(res : Result): List[((String,String),Long)] = {
var r = List[((String,String),Long)]()
var i = List[NameValuePair]()
for(i <- res.pairs){
val tt : ((String,String),Long)= ((res.key,i.name),i.value)
r = tt :: r
}
return r
}
Затем вы можете сразу перейти к строке, где я генерирую rdd_final выше, примерно так:
val rdd_final = rdd_data.flatMap(x=>func(x))
Затем выполнить другие инструкции таким же образом.