Хорошо, ниже приведен способ сделать это с помощью ReduByKey и aggregateByKey.
Проблема, с которой вы столкнулись с aggregateByKey, заключается в том, что последняя функция отвечает за добавление двух аккумуляторов.Он должен возвращать ту же структуру, что и все другие функции, чтобы при добавлении другого нового аккумулятора (из другого раздела) он снова работал.
Это очень похоже на комбинироватьByKey, см. здесь .
rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),\
(u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])
print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),\
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()
print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
[(u'spark ', (4, 4)), (u'java', (2, 2)), (u'hive ', (2, 2)), (u'python ', (2, 2))]
[(u'spark', (4, 4)), (u'java ', (2, 2)), (u'hive', (2, 2)), (u'python ', (2, 2))]
Если вы пытаетесь усреднить значения, вы можете добавить еще одно mapValues в концевот так:
print rdd.aggregateByKey( (0, 0),\
lambda acc, val: (acc[0] + 1,acc[1] + val),\
lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))\
.mapValues(lambda x: x[1] * 1.0 / x[0])\
.collect()
[(u'spark ', 1.0), (u'java', 1.0), (u'hive ', 1.0), (u'python', 1.0)]