Функция AggregateByKey с парами ключ-значение в Pyspark - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть вопрос относительно агрегатного ключа в pyspark.

У меня есть набор данных RDD: premierRDD = [('Chelsea', ('2016–2017', 93)), ('Chelsea', ('2015–2016', 50))]

Я хочу суммировать баллы 50 и 93 с помощью функции aggegrateByKey, и мой ожидаемый результат должен быть: [('Chelsea', '2016–2017', (93,143)), ('Chelsea','2015–2016', (50,143))]

seqFunc = (lambda x, y: ('', x[0] + y[1]))
combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

premierAgg = premierMap.aggregateByKey((0,0), seqFunc, combFunc)

Однако вместо этого я получаю этот вывод: [('Chelsea', ('', 143))]

Может кто-топосоветуйте мне, как правильно использовать функцию aggregrateByKey?

1 Ответ

0 голосов
/ 22 февраля 2019

Я скорректировал ваш код для достижения желаемого результата.Для начала вам нужно сохранить значение "year" в seqFunc.Поэтому я добавил y[0] там.Затем необходимо изменить комбинацию, чтобы она содержала не только сумму, но и исходное значение в кортеже.Кроме того, значение года остается также.Это приведет к [('Chelsea', [(u'2016-2017', (93, 143)), (u'2015-2016', (50, 143))])], как я объяснил в комментарии, те же клавиши будут объединены.Для достижения вашего результата с 2 раза Челси вы можете просто использовать дополнительную функцию карты, как описано.

rdd = sc.parallelize([('Chelsea', (u"2016-2017", 93)), ('Chelsea', (u"2015-2016", 50))])
seqFunc = (lambda x, y: (y[0], x[0] + y[1]))
combFunc = (lambda x, y: [(x[0], (x[1],x[1] + y[1])),(y[0],(y[1],x[1]+y[1]))])

premierAgg = rdd.aggregateByKey((0,0), seqFunc,combFunc)
print premierAgg.map(lambda r: [(r[0], a) for a in r[1]]).collect()[0]

Выход:

[('Chelsea', (u'2016-2017', (93, 143))), ('Chelsea', (u'2015-2016', (50, 143)))]
...