AggregateByKey в Pyspark не дает ожидаемый результат - PullRequest
0 голосов
/ 20 мая 2018

У меня есть СДР, в котором в качестве значения указаны 2 пары пар ключей и значений:

rdd5.glom().collect()

[[(u'hive ', 1), (u'python', 1), (u'spark ', 1), (u'pive', 1), (u'spark ', 1), (u'python', 1)], [(u'spark ', 1), (u'java', 1), (u'java ', 1), (u'spark', 1)]]

Когда я выполняю aggregateByKey

rdd6=rdd5.aggregateByKey((0,0), lambda acc,val: (acc[0]+1,acc[1]+val), lambda acc1,acc2 : (acc1[1]+acc2[1])/acc1[0]+acc2[0])

Это не дает мне ожидаемого результата:

Вывод:

[(u'python ', (2, 2)), (u'spark', 1), (u'java', (2, 2)), (u'hive ', (2, 2))]

Ожидается:

[(u'python', 1), (u'spark ', 1), (u'java', 1), (u'hive ', 1)]

Я вижу ключ, присутствующий в одном разделе, только недавая мне ожидаемый результат.Какие изменения я должен сделать, чтобы добиться этого?

1 Ответ

0 голосов
/ 20 мая 2018

Хорошо, ниже приведен способ сделать это с помощью ReduByKey и aggregateByKey.

Проблема, с которой вы столкнулись с aggregateByKey, заключается в том, что последняя функция отвечает за добавление двух аккумуляторов.Он должен возвращать ту же структуру, что и все другие функции, чтобы при добавлении другого нового аккумулятора (из другого раздела) он снова работал.

Это очень похоже на комбинироватьByKey, см. здесь .

rdd = sc.parallelize([(u'hive', 1), (u'python', 1), (u'spark', 1),\
    (u'hive', 1), (u'spark', 1), (u'python', 1), (u'spark', 1), (u'java', 1), (u'java', 1), (u'spark', 1)])

print rdd.aggregateByKey( (0, 0), lambda acc, val: (acc[0] + 1,acc[1] + val),\
                         lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1])).collect()

print rdd.mapValues(lambda x: (1, x)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()

[(u'spark ', (4, 4)), (u'java', (2, 2)), (u'hive ', (2, 2)), (u'python ', (2, 2))]

[(u'spark', (4, 4)), (u'java ', (2, 2)), (u'hive', (2, 2)), (u'python ', (2, 2))]

Если вы пытаетесь усреднить значения, вы можете добавить еще одно mapValues ​​в концевот так:

print rdd.aggregateByKey( (0, 0),\
                         lambda acc, val: (acc[0] + 1,acc[1] + val),\
                         lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))\
                        .mapValues(lambda x: x[1] * 1.0 / x[0])\
                        .collect()

[(u'spark ', 1.0), (u'java', 1.0), (u'hive ', 1.0), (u'python', 1.0)]

...