Было oop Streaming + arroung 700 MultipleOutput = случайный неработающий стандартный вывод - PullRequest
0 голосов
/ 05 мая 2020

Моя проблема

Используя потоковую передачу Had oop и Python, я пытаюсь разделить данные за год на 2 * 365 выходов, по одному выходу на каждые полдня.

Выполнение:

mapred \
    streaming \
    -libjars ccproject.jar \
    -files src/batch/python \
    -input /ccproject/raw/* \
    -output /test \
    -outputformat ccproject.KeySplitOutputFormat \
    -mapper "python/job.py map" \
    -reducer "python/job.py reduce"

Ошибка этого кода непоследовательно :

  • Всегда при обработке всего года (кроме одного раза).
  • Редко при обработке всего 1 месяц.
  • Никогда, когда я удаляю -outputformat ccproject.KeySplitOutputFormat.

Код, который терпит неудачу, - это оператор python print с Broken pipe исключение.

Если я заменю свой редуктор фиктивной задачей cat, он также выйдет из строя из-за поломки канала (ошибка 141).

В чем вопрос

У меня его нет: (

Как Had oop newb, я чувствую, что это может быть проблема кластера / производительности / конфигурации. Но набор данных довольно мал (6M строк), мультиплексор на 700 выходов не кажется большим числом, и веб-интерфейс Yarn, похоже, указывает, что пиковое использование памяти составляет около 50%. Но опять же, я не знаю, где ищите индикатор работоспособности кластера.

Некоторый контекст

  • Я делаю это, чтобы лично испытать использование и настройку oop с нуля.
  • Я использую кластер с 3 узлами (1 мастер, 2 рабочих) на экземплярах EC2. У них есть только выделенный SSD 8 Go, и я пробовал экземпляры среднего (4Gio RAM) и xlarge (16Gio RAM).

Детали реализации

Класс KeySplitOutputFormat реализация простой мультиплекс на основе ключей. Точный рецепт здесь: { ссылка }.

Упрощенный для ясности мой код выглядит так:

def map(stream):
    fields = [ ... ]

    for date, minutes, value in mapred.iter_curated_fields(stream, fields):
        period = 'AM' if minutes< 12*60 else 'PM'

        # This line basicaly translate to  print "{}-{}\t{}".format(date, time, value)
        mapred.send((date, period), value)

def reduce(stream):
    best_key_for_period = {}

    for period_key, value in mapred.iter_key_values(stream):
        if value > best_key_for_period.get(period_key, 0):
            best_key_for_period[period_key] = value

    for (date, period), value in best_key .iteritems():
        mapred.send((date, period), value)
...