Я написал задание MapReduce, в котором для набора данных были учтены ngram. Результаты представлены в сотнях файлах размером 300 МБ, <ngram>\t<count>
. Я хочу объединить их в один результат, но мои несколько попыток объединения потерпели неудачу («трекер задач исчез»). У меня был тайм-аут в 8 часов, и этот сбой произошел около 8,5 часов, поэтому может быть связано. У меня было # Reducer = 5 (так же, как # узлов). Может быть, мне просто нужно оставить больше времени, хотя ошибка, похоже, не указывает на это. Я подозреваю, что мои узлы перегружаются и перестают отвечать на запросы. Моя теория заключается в том, что мой редуктор может использовать некоторую оптимизацию.
Я использую cat
для моего картографа и следующий скрипт на python для моего редуктора:
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] += count
for key in sorted(counts.keys()):
print '%s\t%s'% (key, counts[key])
Обновление:
Как я намекал в одном из моих комментариев, я запутался в том, что сортировка происходит в Hadoop автоматически. В веб-интерфейсе состояния редуктора отображается несколько разных фаз, в том числе «сортировка» и «уменьшение». Исходя из этого, я предполагаю, что Hadoop сортирует выходные данные сопоставления перед отправкой для уменьшения, но неясно, выполняется ли сортировка по всем данным, отправляемым в редуктор, или по каждому файлу до его уменьшения. Другими словами, мой картограф берет 100 полей, разбивает их на 400 выходов, каждый из которых просто cat
, направляя их в редуктор, затем редукторы (всего 5) получают эти 80 потоков. Объединяет ли сортировка все 80 или сортирует 1, уменьшает ее; так далее? Основываясь на графиках, которые могут явно не указывать на фактическое поведение, процесс сортировки происходит перед любым сокращением. Если сортировка действительно сортирует все входные файлы, то я могу упростить мой редуктор, чтобы не хранить словарь всех подсчетов, и просто распечатать пару ключ-итогCount после изменения ключа.
Что касается использования сумматора, я не думаю, что это было бы полезно в моем случае, так как данные, которые я сокращаю, уже были уменьшены в 100 файлах, которые я пытаюсь объединить. Поскольку мои # узлы = # редукторы (5 и 5), объединить нечего, что редуктор еще не делает.