Эффективный способ уменьшить результаты MapReduce? - PullRequest
2 голосов
/ 05 ноября 2011

Я написал задание MapReduce, в котором для набора данных были учтены ngram. Результаты представлены в сотнях файлах размером 300 МБ, <ngram>\t<count>. Я хочу объединить их в один результат, но мои несколько попыток объединения потерпели неудачу («трекер задач исчез»). У меня был тайм-аут в 8 часов, и этот сбой произошел около 8,5 часов, поэтому может быть связано. У меня было # Reducer = 5 (так же, как # узлов). Может быть, мне просто нужно оставить больше времени, хотя ошибка, похоже, не указывает на это. Я подозреваю, что мои узлы перегружаются и перестают отвечать на запросы. Моя теория заключается в том, что мой редуктор может использовать некоторую оптимизацию.

Я использую cat для моего картографа и следующий скрипт на python для моего редуктора:

#!/usr/bin/env python
import sys

counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if key not in counts:
        counts[key] = 0
    counts[key] += count

for key in sorted(counts.keys()):
    print '%s\t%s'% (key, counts[key])

Обновление: Как я намекал в одном из моих комментариев, я запутался в том, что сортировка происходит в Hadoop автоматически. В веб-интерфейсе состояния редуктора отображается несколько разных фаз, в том числе «сортировка» и «уменьшение». Исходя из этого, я предполагаю, что Hadoop сортирует выходные данные сопоставления перед отправкой для уменьшения, но неясно, выполняется ли сортировка по всем данным, отправляемым в редуктор, или по каждому файлу до его уменьшения. Другими словами, мой картограф берет 100 полей, разбивает их на 400 выходов, каждый из которых просто cat, направляя их в редуктор, затем редукторы (всего 5) получают эти 80 потоков. Объединяет ли сортировка все 80 или сортирует 1, уменьшает ее; так далее? Основываясь на графиках, которые могут явно не указывать на фактическое поведение, процесс сортировки происходит перед любым сокращением. Если сортировка действительно сортирует все входные файлы, то я могу упростить мой редуктор, чтобы не хранить словарь всех подсчетов, и просто распечатать пару ключ-итогCount после изменения ключа.

Что касается использования сумматора, я не думаю, что это было бы полезно в моем случае, так как данные, которые я сокращаю, уже были уменьшены в 100 файлах, которые я пытаюсь объединить. Поскольку мои # узлы = # редукторы (5 и 5), объединить нечего, что редуктор еще не делает.

Ответы [ 2 ]

2 голосов
/ 10 декабря 2011

Проблема была в том, что я неправильно понял, как работает MapReduce.Все данные, поступающие в Редуктор, сортируются.Мой код выше был полностью неоптимизирован.Вместо этого я просто отслеживаю текущий ключ, а затем распечатываю предыдущий текущий, когда появляется новый ключ.

#!/usr/bin/env python
import sys

cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%s\t%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count += count
# printing out final key if set
if cur_key:
    print '%s\t%s'% (cur_key, cur_key_count)
1 голос
/ 05 ноября 2011

Используйте top, чтобы убедиться, что ваш редуктор привязан к процессору, а не к вводу-выводу (возможно, вызывает переключение) во время работы.

8 часов / 20 заданий на хост - 24 минуты на задание 300 МБ

Вы могли бы использовать heapq так, чтобы структура данных, встроенная в память, сохранялась отсортированной: Смотрите раздел 8.4.1 из: http://docs.python.org/library/heapq.html

...