создать pyspark rdd с лямбдой - PullRequest
       12

создать pyspark rdd с лямбдой

0 голосов
/ 02 ноября 2019

Я хочу посчитать процент каждого числа.

rdd1=sc.parallelize([1,2,3,4,1,5,7,3])

Я попытался

rdd2=rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))

и получил rdd2.collect (): [(1,2), (2, 1), (3,2), (4,1), (5,1), (7,1)] затем

percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1.count())))
print(percentage.collect())

произошла ошибка на этапе печати, затем я попытался

percentage=rdd2.map(lambda x:(x[0],(x[1]/len(rdd1.collect()))))
print(percentage.collect())

также произошла ошибка в шаге печати. ​​

Ответы [ 2 ]

1 голос
/ 03 ноября 2019

Я извлекаю из того, что вы сказали, что вы хотите relative frequency каждого уникального члена СДР.

from operator import add

rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
count = rdd1.count()

rdd2=rdd1
    .map(lambda x: (x, 1))  # [(1,1),(2,1),(3,1),(4,1),(1,1),(5,1),(7,1),(3,1)]
    .reduceByKey(add)       # [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)]
    .mapValues( lambda vSum : vSum / count ) 

rdd2.collect()
# [(1,2/8),(2,1/8),(3,2/8),(4,1/8),(5,1/8),(7,1/8)]
1 голос
/ 03 ноября 2019

SPARK-5603 говорит, что вложенные операции RDD не поддерживаются.

Нельзя ссылаться на действие RDD внутри преобразования:

Если вы вызываете действие, которое являетсяcount() перед этим ваш код будет работать.

rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
rdd2 = rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))
rdd1_len = rdd1.count()
percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1_len)))

percentage.collect()
# [(1, 0.25), (2, 0.125), (3, 0.25), (4, 0.125), (5, 0.125), (7, 0.125)]
...