Моя проблема
Используя потоковую передачу Had oop и Python, я пытаюсь разделить данные за год на 2 * 365 выходов, по одному выходу на каждые полдня.
Выполнение:
mapred \
streaming \
-libjars ccproject.jar \
-files src/batch/python \
-input /ccproject/raw/* \
-output /test \
-outputformat ccproject.KeySplitOutputFormat \
-mapper "python/job.py map" \
-reducer "python/job.py reduce"
Ошибка этого кода непоследовательно :
- Всегда при обработке всего года (кроме одного раза).
- Редко при обработке всего 1 месяц.
- Никогда, когда я удаляю
-outputformat ccproject.KeySplitOutputFormat
.
Код, который терпит неудачу, - это оператор python print
с Broken pipe
исключение.
Если я заменю свой редуктор фиктивной задачей cat
, он также выйдет из строя из-за поломки канала (ошибка 141).
В чем вопрос
У меня его нет: (
Как Had oop newb, я чувствую, что это может быть проблема кластера / производительности / конфигурации. Но набор данных довольно мал (6M строк), мультиплексор на 700 выходов не кажется большим числом, и веб-интерфейс Yarn, похоже, указывает, что пиковое использование памяти составляет около 50%. Но опять же, я не знаю, где ищите индикатор работоспособности кластера.
Некоторый контекст
- Я делаю это, чтобы лично испытать использование и настройку oop с нуля.
- Я использую кластер с 3 узлами (1 мастер, 2 рабочих) на экземплярах EC2. У них есть только выделенный SSD 8 Go, и я пробовал экземпляры среднего (4Gio RAM) и xlarge (16Gio RAM).
Детали реализации
Класс KeySplitOutputFormat
реализация простой мультиплекс на основе ключей. Точный рецепт здесь: { ссылка }.
Упрощенный для ясности мой код выглядит так:
def map(stream):
fields = [ ... ]
for date, minutes, value in mapred.iter_curated_fields(stream, fields):
period = 'AM' if minutes< 12*60 else 'PM'
# This line basicaly translate to print "{}-{}\t{}".format(date, time, value)
mapred.send((date, period), value)
def reduce(stream):
best_key_for_period = {}
for period_key, value in mapred.iter_key_values(stream):
if value > best_key_for_period.get(period_key, 0):
best_key_for_period[period_key] = value
for (date, period), value in best_key .iteritems():
mapred.send((date, period), value)