Выход редуктора MRJob - PullRequest
       8

Выход редуктора MRJob

0 голосов
/ 10 декабря 2018

Есть ли способ сортировки вывода функции редуктора с помощью mrjob?

Я думаю, что функция ввода в редуктор сортируется по ключу, и я пытался использовать эту функцию для сортировки вывода, используя другой редукторкак показано ниже, где я знаю, что значения имеют числовые значения, я хочу подсчитать количество каждого ключа и отсортировать ключи в соответствии с этим счетом:

def mapper_1(self, key, line):
    key = #extract key from the line
    yield (key, 1)

def reducer_1(self, key, values):
    yield key, sum(values)

def mapper_2(self, key, count):
    yield ('%020d' % int(count), key)

def reducer_2(self, count, keys):
    for key in keys:
        yield key, int(count)

, но его вывод отсортирован неправильно!Я подозревал, что это странное поведение связано с манипулированием int s как string и пытался отформатировать его как эта ссылка говорит, но это не сработало!

ВАЖНОПРИМЕЧАНИЕ: Когда я использую отладчик, чтобы увидеть порядок вывода reducer_2, порядок правильный, но то, что выводится как вывод, является чем-то другим !!!

ВАЖНОЕ ПРИМЕЧАНИЕ 2: На другом компьютере та же программа с теми же данными возвращает выходные данные, отсортированные как и ожидалось!

1 Ответ

0 голосов
/ 18 декабря 2018

Вы можете отсортировать значения как целые числа во втором редукторе и затем преобразовать их в представление с дополнением нулями:

import re

from mrjob.job import MRJob
from mrjob.step import MRStep

WORD_RE = re.compile(r"[\w']+")


class MRWordFrequencyCount(MRJob):

    def steps(self):
        return [
            MRStep(
                mapper=self.mapper_extract_words, combiner=self.combine_word_counts,
                reducer=self.reducer_sum_word_counts
            ),
            MRStep(
                reducer=self.reduce_sort_counts
            )
        ]

    def mapper_extract_words(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combine_word_counts(self, word, counts):
        yield word, sum(counts)

    def reducer_sum_word_counts(self, key, values):
        yield None, (sum(values), key)

    def reduce_sort_counts(self, _, word_counts):
        for count, key in sorted(word_counts, reverse=True):
            yield ('%020d' % int(count), key)

Что ж, это сортировка вывода в памяти, что может быть проблемой в зависимости отразмер входа.Но вы хотите, чтобы он был отсортирован, поэтому он должен быть как-то отсортирован.

...