MapReduce попарное сравнение всех строк в нескольких файлах - PullRequest
2 голосов
/ 11 июля 2011

Я начинаю использовать mrjob для Python, чтобы преобразовать некоторые из моих давно работающих программ на Python в задания MapReduce hadoop.Я привел в действие простые примеры подсчета слов, и я концептуально понимаю пример «классификация текста».

Однако у меня возникли небольшие проблемы с определением шагов, которые мне нужно сделать, чтобы решить мою проблему.работает.

У меня есть несколько файлов (около 6000), каждый из которых имеет от 2 до 800 строк каждый.В этом случае каждая строка представляет собой простой разделенный пробелами «сигнал».Мне нужно сравнить соотношение между каждой строкой в ​​каждом файле и КАЖДОЙ другой строкой во ВСЕХ файлах (включая себя).Затем, основываясь на коэффициенте корреляции, я выведу результаты.

Пример одного файла:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

Мне нужно выдать каждую LINE этого файла в паре с КАЖДОЙ ДРУГОЙ LINEдругой файл ... или я мог бы объединить все файлы в один файл, если это упростит задачу, но мне все равно потребуется парная итерация.

Я понимаю, как выполнять вычисления и как использовать последний шаг сокращенияагрегировать и фильтровать результаты.Трудность, с которой я сталкиваюсь, заключается в том, как перевести все попарные элементы в последовательные шаги, не читая все файлы в одном файле setp?Я думаю, что я мог бы подготовить входной файл заранее, который использует itertools.product, но этот файл будет чрезмерно большим.

1 Ответ

1 голос
/ 11 июля 2011

Ну, так как никто не придумал ответ, я опубликую свой текущий обходной путь на случай, если кому-то еще это понадобится. Я не уверен, насколько это «канонично» или эффективно, но пока это работает.

Я поставил имя файла в качестве первого элемента каждой строки файла, за которым следует \t, за которым следуют остальные данные. Для этого примера я просто использую одно число в каждой строке, а затем усредняю ​​их, как очень тривиальный пример.

Затем я сделал следующий шаг уменьшения карты в mrjob.

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
    """Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

    fnum, val = value.split('\t')
    yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

    for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
        yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

    fnum1, fnum2, val1, val2 = value
    res = (float(val1)+float(val2))/float(2)
    yield key, (fnum2, res)

def get_max_avg(self, key, values):

    max_fnum, max_avg = max(values, key = lambda x: x[1])
    yield key, (max_fnum, max_avg)

def steps(self):
    return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
                self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

Таким образом, все выходные данные функции input_mapper группируются в одну и ту же input_reducer, которая затем yield последовательных пар. Затем они проходят в нужные места, чтобы в итоге вернуть наибольшее среднее значение (которое на самом деле является самым большим элементом во всех других файлах).

Надеюсь, это кому-нибудь поможет.

...