Pyspark: применение метода «уменьшить по ключу» к значениям rdd - PullRequest
0 голосов
/ 20 июня 2019

После некоторых преобразований у меня получился rdd со следующим форматом:

[(0, [('a', 1), ('b', 1), ('b', 1), ('b', 1)])

(1, [('c', 1), ('d', 1), ('h', 1), ('h', 1)])]

Я не могу понять, как по существу "reduByKey ()" в части значений этого rdd.

Это то, чего я хотел бы достичь:

[(0, [('a', 1), ('b', 3)])

(1, [('c', 1), ('d', 1), ('h', 2)])]

Я изначально использовал .values ​​(), затем применял к нему результат ReduByKey, но потом я потерял свой оригинальный ключ (в этом случае 0 или 1).

Ответы [ 2 ]

1 голос
/ 20 июня 2019

Вы потеряете оригинальный ключ, потому что .values() получит только значение key-value подряд. Вы должны суммировать кортеж в строке.

from collections import defaultdict

def sum_row(row):
    result = defaultdict(int)
    for key, val in row[1]:
        result[key] += val
    return (row[0],list(result.items()))

data_rdd = data_rdd.map(sum_row)
print(data_rdd.collect())

# [(0, [('a', 1), ('b', 3)]), (1, [('h', 2), ('c', 1), ('d', 1)])]
0 голосов
/ 22 июня 2019

Хотя values дает СДР, reduceByKey работает со всеми значениями СДР не по строкам.

Вы также можете использовать groupby (требуется заказ) для достижения того же:

from itertools import groupby

distdata.map(lambda x: (x[0], [(a, sum(c[1]  for c in b)) for a,b in groupby(sorted(x[1]), key=lambda p: p[0]) ])).collect()
...