Сбор первых N записей для каждого ключа в KeyVal RDD в PySpark - PullRequest
1 голос
/ 16 июня 2019

У меня есть СДР с большим количеством записей KeyVal. Один и тот же ключ будет присутствовать несколько раз, и я заинтересован в извлечении первых N записей для каждого ключа. Будучи новичком в Spark, я до сих пор не мог понять, как это сделать, поэтому любая помощь будет принята с благодарностью.

Ввод может выглядеть следующим образом:

rdd = sc.parallelize([('a',1),('a',2),('b',3),('a',5),('b',4),('b',6)])

Требуемый конечный результат, скажем, для первых 2 записей для каждой клавиши:

output = {'a':[1,2], 'b':[3,4]}

Если бы меня интересовали только первые n записей, я, конечно, мог бы просто применить take(n) к исходному СДР. Я ищу способ, похожий на функцию take(n), но повторяющийся над каждым ключом. Если бы я мог создать RDD, который просто содержит желаемое подмножество исходного RDD, это было бы хорошо. Впоследствии сбор в словарь не представляет особой проблемы.

Таким образом, промежуточный вывод (в стиле RDD) будет:

[('a',1),('a',2),('b',3),('b',4)]

Как мне этого добиться в PySpark?

Редактировать: Предложенный дублирующий вопрос определенно требовал решения с использованием reduceByKey, что в данном случае не является обязательным.

Ответы [ 3 ]

2 голосов
/ 16 июня 2019

Слушай, детка ...

nLength = 2
rdd.groupByKey().map(lambda x: (x[0], list(x[1])[:nLength]))

Объяснение:

rdd.groupByKey()  

Сгруппируйте СДР по ключу (в нашем случае: «a» или «b»). Результат:
[('a', ResultIterable), ('b', ResultIterable)]


.map(lambda x: (x[0], list(x[1])[:nLength]))

Эта часть создала кортеж: на левой стороне клавиша ('a' или 'b'), а на правой стороне мы создаем список из ResultIterable (x [1]), а затем вырезаем список от 0 до nLength ([: nLength]).

Наслаждайтесь!

0 голосов
/ 16 июня 2019

попробуйте это:

 def slice_list(s,no_of_values):
        return s[0:no_of_values]


rdd.groupByKey().map(lambda x: (x[0],slice_list( list(x[1]),2))).collect()
0 голосов
/ 16 июня 2019

Может быть, что-то простое, как это сделало бы работу:

rdd = sc.parallelize([('a',1),('a',2),('b',3),('a',5),('b',4),('b',6)])
n = 2
rdd.groupByKey().map(lambda x : (x[0], list(x[1])[:n])).collect()

Выход:

[('b', [3, 4]), ('a', [1, 2])]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...