создание пары rdd из двух основанных на rdds счетчиков повторений первого rdd в pyspark? - PullRequest
0 голосов
/ 05 декабря 2018

Я создал 2 RDD, как показано ниже

rd2=sc.parallelize([['A', 'B','D'], ['B', 'C'], ['A', 'B'],['B']])
rd3=sc.parallelize([['A', 'B'],['B', 'C'],['B','D']])
rd2.collect()
[['A', 'B','D'], ['B', 'C'], ['A', 'B'],['B']]
rd3.collect()
[['A', 'B'], ['B', 'C'],['B','D']]

Теперь я хочу посчитать общий элемент в числе повторений обоих RDD в rd2 как значение в новом rd4, то есть

['A', 'B']является обычным для обоих rdd, но число повторений в rd2 равно 2.

мой ожидаемый rd4:

[(['A','B'],2),(['B','C'],1),(['B','D'],1)]

1 Ответ

0 голосов
/ 05 декабря 2018

Вы можете проверить количество повторений, используя countByKey на rdd, оно вернет defaultdict.

Но вы сказали, что хотите получить результат как rdd, так что вы можете использовать вместо него функцию reduceByKey.

я создам rdd так же, как ваш

rd2=sc.parallelize([['A', 'B'], ['B', 'C'], ['A', 'B'],['B']])

rd2.map(lambda x: (tuple(x),1)).reduceByKey(lambda x,y: x+y).collect()
[(('B',), 1), (('A', 'B'), 2), (('B', 'C'), 1)]

теперь у вас есть выходной rdd в виде структуры (tuple,count), вы можете изменить его на список с помощью функции map.

rd2.map(lambda x: (tuple(x),1)).reduceByKey(lambda x,y: x+y).map(lambda x: (list(x[0]),x[1])).collect()
[(['B'], 1), (['A', 'B'], 2), (['B', 'C'], 1)] 

Надеюсь, это решит вашу проблему.

...