Сокращение значений в списках (ключ, значение) СДР, учитывая, что эти списки являются значениями в другом списке (ключ, значение) СДР - PullRequest
1 голос
/ 01 октября 2019

Я какое-то время кувыркаюсь над этим - буду очень признателен за любые предложения! Извините за длинное название, я надеюсь, что короткий пример, который я построю ниже, объяснит это намного лучше.

Допустим, у нас есть СДР следующей формы:

data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
           (2,[('k3',1),('k3',8),('k1',6)])])
data.collect()

Вывод:

[(1, [('k1', 4), ('k2', 3), ('k1', 2)]),
 (2, [('k3', 1), ('k3', 8), ('k1', 6)])]

Я собираюсь сделать следующее с самым глубоким списком (key, val) RDD

.reduceByKey(lambda a, b: a + b)

(т.е. уменьшить значения этих RDD по ключу, чтобы получить сумму наключ при сохранении результата, сопоставленного с ключами начального RDD более высокого уровня, что приведет к следующему выводу):

[(1, [('k1', 6), ('k2', 3)]),
 (2, [('k3', 9), ('k1', 6)])]

Я относительно новичок в PySpark и, возможно, здесь что-то упущено, но япопробовал много разных подходов к этому, но по существу не может найти способ получить доступ и уменьшитьByKey (RDD) ключей (key, val) в списке, который сам по себе является значением другого RDD.

Большое спасибо заранее!

Денис

Ответы [ 3 ]

1 голос
/ 02 октября 2019

То, что вы пытаетесь сделать, это: ваше значение (на входе K, V) является повторяемым , для которого вы хотите суммировать внутренний ключ и возвращать результат как =>

(external_key ( например 1,2 ) -> Список (Inner_Key ( Например, "K1", "K2" ), Summed_value))

Как вы видите суммурассчитывается на внутренний ключ-V , этого можно добиться,

Первый очистка элементов из каждого элемента списка

=> создание новогоключ как (внешний ключ, внутренний ключ)

=> суммирование по (external_key, inner_key) -> значение

=> изменение формата данных обратно на (external_key -> (inner_key, summed_value)))

=> наконец снова группировать по внешнему ключу

Я не уверен насчет Python-1, но считаю, что достаточно просто заменить синтаксис коллекции Scala на Python, и вотрешение

версия SCALA

scala> val keySeq = Seq((1,List(("K1",4),("K2",3),("K1",2))),
     | (2,List(("K3",1),("K3",8),("K1",6))))
keySeq: Seq[(Int, List[(String, Int)])] = List((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))

scala> val inRdd = sc.parallelize(keySeq)
inRdd: org.apache.spark.rdd.RDD[(Int, List[(String, Int)])] = ParallelCollectionRDD[111] at parallelize at <console>:26

scala> inRdd.take(10)
res64: Array[(Int, List[(String, Int)])] = Array((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))


// And solution :
scala> inRdd.flatMap { case (i,l) => l.map(l => ((i,l._1),l._2)) }.reduceByKey(_+_).map(x => (x._1._1 ->(x._1._2,x._2))).groupByKey.map(x => (x._1,x._2.toList.sortBy(x =>x))).collect()

// RESULT ::
res65: Array[(Int, List[(String, Int)])] = Array((1,List((K1,6), (K2,3))), (2,List((K1,6), (K3,9))))

ОБНОВЛЕНИЕ => Python Solution

>>> data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
...            (2,[('k3',1),('k3',8),('k1',6)])])
>>> data.collect()
[(1, [('k1', 4), ('k2', 3), ('k1', 2)]), (2, [('k3', 1), ('k3', 8), ('k1', 6)])]

# Similar operation

>>> data.flatMap(lambda x : [ ((x[0],y[0]),y[1]) for y in x[1]]).reduceByKey(lambda a,b : (a+b)).map(lambda x : [x[0][0],(x[0][1],x[1])]).groupByKey().mapValues(list).collect()

# RESULT 
[(1, [('k1', 6), ('k2', 3)]), (2, [('k3', 9), ('k1', 6)])]
0 голосов
/ 02 октября 2019

использовать mapValues ​​() + itertools.groupby () :

from itertools import groupby

data.mapValues(lambda x: [ (k, sum(f[1] for f in g)) for (k,g) in groupby(sorted(x), key=lambda d: d[0]) ]) \
    .collect()
#[(1, [('k1', 6), ('k2', 3)]), (2, [('k1', 6), ('k3', 9)])]

с itertools.groupby , мы используем первоеэлемент кортежа в виде сгруппированного ключа k и суммирование 2-го элемента из кортежа в каждом g.

Редактирование: для большого набора данных, сортировка с помощью itertools.groupbyэто дорого, просто напишите функцию без сортировки для обработки того же:

def merge_tuples(x):
    d = {}
    for (k,v) in x: 
        d[k] = d.get(k,0) + v
    return d.items()

data.mapValues(merge_tuples).collect()
#[(1, [('k2', 3), ('k1', 6)]), (2, [('k3', 9), ('k1', 6)])]
0 голосов
/ 02 октября 2019

вам следует .map ваш набор данных вместо того, чтобы уменьшать, потому что количество строк в вашем примере такое же, как в исходном наборе данных, внутри карты вы можете уменьшить значения в виде списка Python

...