Groupby и головная боль понимания списка в Python - PullRequest
0 голосов
/ 23 ноября 2011

Я получил это из учебника Hadoop. Это редуктор, который в основном берет (слово, количество) пар из стандартного ввода и суммирует их.

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_uppercount = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            pass

Теперь я хочу иметь возможность принимать кортежи (word, count1, count2), но это дело groupby и sum(int(count for current_word, count in group) совершенно не читается для меня. Как мне изменить этот чанк, чтобы он в основном продолжал делать то же, что и сейчас, но со вторым значением счетчика? То есть ввод (word, count1, count2) и вывод (word, count1, count2).

РЕДАКТИРОВАТЬ 1:

from itertools import groupby, izip
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 2)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            counts_a, counts_b = izip((int(count_a), int(count_b)) for current_word, count_a, count_b in group)
            t1, t2 = sum(counts_a), sum(counts_b)
            print "%s%s%d%s%d" % (current_word, separator, t1, separator, t2)
        except ValueError:
            pass

Это задание Hadoop, поэтому вывод выглядит так:

11/11/23 18:44:21 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob:  map 100%  reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob:  map 100%  reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob:  map 100%  reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!

Из журналов:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.

Ответы [ 2 ]

2 голосов
/ 23 ноября 2011

groupby

Это функция groupby из модуля itertools, задокументированная здесь .data «группируется» по результатам применения itemgetter(0) (экземпляр класса itemgetter из модуля operator, задокументированный здесь ) к каждому элементу.Он возвращает пары (ключевой результат, итератор над элементами с этим ключом).Таким образом, каждый раз в цикле current_word является «словом», общим для группы data строк (индекс-0, то есть первый элемент, извлеченный с помощью itemgetter), а group - этоитератор над строками data, которые начинаются с этого word.Как описано в документации к вашему коду, в каждой строке файла есть два слова: фактическое «слово» и количество (текст, предназначенный для интерпретации как числа)

сумма (int (количество)) для current_word, количество в группе)

То, что означает именно то, что написано : сумма целочисленных значений count, для каждого (current_word, count) пара найдена в group.Каждый group представляет собой набор строк из data, как описано выше.Итак, мы берем все строки, которые начинаются с current_word, преобразуем их строковые значения count в целые числа и складываем их.

Как изменить этот чанк, чтобы он в основном продолжал делать то, чтоэто делает прямо сейчас, но со вторым значением счетчика?Т.е. ввод (word, count1, count2) и вывод (word, count1, count2).

Хорошо, что вы хотите, чтобы каждый счет представлял, и куда вы хотите, чтобы данные приходилис?

Я возьму то, что я считаю самой простой интерпретацией: вы собираетесь изменить файл данных так, чтобы в каждой строке было три элемента, исобирать суммы из каждого столбца чисел отдельно.

groupby будет таким же, потому что мы по-прежнему группируем строки, которые получаем одинаково, и мы по-прежнему группируем их в соответствии с«слово».

Для части sum потребуется вычислить два значения: сумму для первого столбца чисел и сумму для второго столбца чисел.

Когда мы выполняем итерациюсвыше group мы получим наборы из трех значений, поэтому мы хотим распаковать их в три значения: например, current_word, group_a, group_b.Для каждого из них мы хотим применить целочисленное преобразование к обоим числам в каждой строке.Это дает нам последовательность пар чисел;если мы хотим добавить все первые числа и все вторые числа, то вместо этого мы должны создать пару последовательностей чисел.Для этого мы можем использовать другую функцию itertools с именем izip.Затем мы можем суммировать каждый из них по отдельности, снова распаковывая их в две отдельные переменные последовательности чисел и суммируя их.

Таким образом:

counts_a, counts_b = izip(
    (int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
total_a, total_b = sum(counts_a), sum(counts_b)

Или мы могли бы просто сделатьпара подсчетов, выполнив тот же (x для y в z) трюк снова:

totals = (
    sum(counts)
    for counts in izip(
        (int(count_a), int(count_b)) for current_word, count_a, count_b in group
    )
)

Хотя этот результат будет несколько сложнее использовать в операторе print:)

0 голосов
/ 23 ноября 2011
from collections import defaultdict

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    counts = defaultdict(lambda: [0, 0])
    for word, (count1, count2) in data:
        values = counts[word]
        values[0] += count1
        values[1] += count2

    for word, (count1, count2) in counts.iteritems():
        print('{0}\t{1}\t{2}'.format(word, count1, count2))
...