То, что вы пытаетесь сделать, это: ваше значение (на входе K, V) является повторяемым , для которого вы хотите суммировать внутренний ключ и возвращать результат как =>
(external_key ( например 1,2 ) -> Список (Inner_Key ( Например, "K1", "K2" ), Summed_value))
Как вы видите суммурассчитывается на внутренний ключ-V , этого можно добиться,
Первый очистка элементов из каждого элемента списка
=> создание новогоключ как (внешний ключ, внутренний ключ)
=> суммирование по (external_key, inner_key) -> значение
=> изменение формата данных обратно на (external_key -> (inner_key, summed_value)))
=> наконец снова группировать по внешнему ключу
Я не уверен насчет Python-1, но считаю, что достаточно просто заменить синтаксис коллекции Scala на Python, и вотрешение
версия SCALA
scala> val keySeq = Seq((1,List(("K1",4),("K2",3),("K1",2))),
| (2,List(("K3",1),("K3",8),("K1",6))))
keySeq: Seq[(Int, List[(String, Int)])] = List((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
scala> val inRdd = sc.parallelize(keySeq)
inRdd: org.apache.spark.rdd.RDD[(Int, List[(String, Int)])] = ParallelCollectionRDD[111] at parallelize at <console>:26
scala> inRdd.take(10)
res64: Array[(Int, List[(String, Int)])] = Array((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
// And solution :
scala> inRdd.flatMap { case (i,l) => l.map(l => ((i,l._1),l._2)) }.reduceByKey(_+_).map(x => (x._1._1 ->(x._1._2,x._2))).groupByKey.map(x => (x._1,x._2.toList.sortBy(x =>x))).collect()
// RESULT ::
res65: Array[(Int, List[(String, Int)])] = Array((1,List((K1,6), (K2,3))), (2,List((K1,6), (K3,9))))
ОБНОВЛЕНИЕ => Python Solution
>>> data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
... (2,[('k3',1),('k3',8),('k1',6)])])
>>> data.collect()
[(1, [('k1', 4), ('k2', 3), ('k1', 2)]), (2, [('k3', 1), ('k3', 8), ('k1', 6)])]
# Similar operation
>>> data.flatMap(lambda x : [ ((x[0],y[0]),y[1]) for y in x[1]]).reduceByKey(lambda a,b : (a+b)).map(lambda x : [x[0][0],(x[0][1],x[1])]).groupByKey().mapValues(list).collect()
# RESULT
[(1, [('k1', 6), ('k2', 3)]), (2, [('k3', 9), ('k1', 6)])]