Используя pyspark RDD .groupByKey извлеките элемент с наибольшим значением для каждой группы - PullRequest
0 голосов
/ 28 марта 2020
TOKEN_RE = re.compile(r"\b[\w']+\b")
def pos_tag_counter(line):
    toks = nltk.regexp_tokenize(line.lower(), TOKEN_RE)
    postoks = nltk.tag.pos_tag(toks)
    return postoks

pos_tag_counts = text.filter(lambda line: len(line) > 0) \
    .filter(lambda line: re.findall('^(?!URL).*', line)) \
    .flatMap(pos_tag_counter) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .map(lambda x: (x[0][1], (x[1], x[0][0]))) \
    .groupByKey().map(lambda x : (x[0], list(x[1])))

У меня есть текстовый файл, который был сокращен до строк, чем слова, слова были подсчитаны и помечены меткой POS (часть речи). Итак, теперь у меня есть ряд кортежей (pos, (word, count)). POS является ключом. Мне нужно найти наиболее часто встречающееся слово для каждого POS.

[('NN', (1884, 'вашингтон')), («NN», (5, «звездный»)), («VBD», (563, «хранится»)), ('DT', (435969, 'the')), («JJ», (9300, «первый»)), («NN», (1256, «половина»)), («NN», (4028, «сезон»)),

Это мой первый проект pyspark, поэтому я не думаю, что полностью понимаю концепцию. Я использовал группу

[('VBD', [(563, «хранится»), (56715, «сказал»), (2640, «получил»), (12370, 's'), (55523, «был»), (62, «отрезал»),

В идеале вывод будет - (POS, count, word) в любом порядке, если кортеж показывает слово с наибольшим количеством в POS:

(«NN», 1884, «Вашингтон») («DT», 435969, «the») и др c.

Ответы [ 2 ]

1 голос
/ 28 марта 2020

Basi c идея равна groupByKey, затем найдите максимальное значение для каждой группы. Поскольку вам нужно самое длинное слово, вы можете определить ключ к методу max как длину слова.

rdd = sc.parallelize([('NN', (1884, 'washington')),
    ('NN', (5, 'stellar')),
    ('VBD', (563, 'kept')),
    ('DT', (435969, 'the')),
    ('JJ', (9300, 'first')),
    ('NN', (1256, 'half')),
    ('NN', (4028, 'season'))])

pos_count = rdd.groupByKey()
               .mapValues(lambda v: max(v, key=lambda x: len(x[1])))

print(pos_count.collect())
# [('DT', (435969, 'the')), ('VBD', (563, 'kept')), ('NN', (1884, 'washington')), ('JJ', (9300, 'first'))]
0 голосов
/ 28 марта 2020

Разве вы не можете просто изменить шаг отображения на map(lambda x: (x[0][1], x[1], x[0][0])), то есть:

pos_tag_counts = text.filter(lambda line: len(line) > 0) \
    .filter(lambda line: re.findall('^(?!URL).*', line)) \
    .flatMap(pos_tag_counter) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .map(lambda x: (x[0][1], x[1], x[0][0])) 
...