ненастоящий список в писпарке - PullRequest
0 голосов
/ 11 июня 2018

Я пытаюсь использовать combineByKey, чтобы найти медиану для ключа для моего назначения (использование combineByKey является требованием для назначения), и я планирую использовать следующую функцию, чтобы вернуть (k, v) пары, где v = a список всех значений, связанных с одним и тем же ключом.После этого я планирую отсортировать значения и затем найти медиану.

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

rdd = data.combineByKey(lambda value: value, lambda c, v: median1(c,v), lambda c1, c2: median2(c1,c2))

def median1 (c,v):
    list = [c]
    list.append(v)
    return list

def median2 (c1,c2):
    list2 = [c1]
    list2.append(c2)
    return list2

Тем не менее, мой код выдает следующее:

[('A', [[2, [4, 9]], 3]), ('B', [10, 20])]

, где значение является вложенным списком.В любом случае, можно ли откатить значения в pyspark, чтобы получить

[('A', [2, 4, 9, 3]), ('B', [10, 20])]

Или есть другие способы найти медиану для каждого ключа, используя combineByKey?Спасибо!

Ответы [ 2 ]

0 голосов
/ 11 июня 2018

гораздо проще использовать collect_list в столбце данных.

from pyspark.sql.functions import collect_list

df = rdd.toDF(['key', 'values'])

key_lists = df.groupBy('key').agg(collect_list('values').alias('value_list'))
0 голосов
/ 11 июня 2018

Вы просто не сделали хороший сумматор из значения.

Вот ваш ответ:

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

def createCombiner(value):
    return [value]
def mergeValue(c, value):
    return c.append(value)
def mergeCombiners(c1, c2):
    return c1+c2

rdd = data.combineByKey(createCombiner, mergeValue, mergeCombiners)

[('A', [9, 4, 2, 3]), ('B', [10, 20])]

...