Ну, так как никто не придумал ответ, я опубликую свой текущий обходной путь на случай, если кому-то еще это понадобится. Я не уверен, насколько это «канонично» или эффективно, но пока это работает.
Я поставил имя файла в качестве первого элемента каждой строки файла, за которым следует \t
, за которым следуют остальные данные. Для этого примера я просто использую одно число в каждой строке, а затем усредняю их, как очень тривиальный пример.
Затем я сделал следующий шаг уменьшения карты в mrjob
.
class MRAvgPairwiseLines(MRJob):
def input_mapper(self, _, value):
"""Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""
fnum, val = value.split('\t')
yield 'ALL', (fnum, val)
def input_reducer(self, key, values):
for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
yield fnum1, (fnum1, fnum2, val1, val2)
def do_avg(self, key, value):
fnum1, fnum2, val1, val2 = value
res = (float(val1)+float(val2))/float(2)
yield key, (fnum2, res)
def get_max_avg(self, key, values):
max_fnum, max_avg = max(values, key = lambda x: x[1])
yield key, (max_fnum, max_avg)
def steps(self):
return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]
Таким образом, все выходные данные функции input_mapper
группируются в одну и ту же input_reducer
, которая затем yield
последовательных пар. Затем они проходят в нужные места, чтобы в итоге вернуть наибольшее среднее значение (которое на самом деле является самым большим элементом во всех других файлах).
Надеюсь, это кому-нибудь поможет.