Ошибка сломанной трубы приводит к сбою потоковой передачи задания Elastic MapReduce на AWS - PullRequest
7 голосов
/ 27 марта 2012

Локально все работает, когда я делаю следующее:

cat input | python mapper.py | sort | python reducer.py

Однако, когда я запускаю потоковое задание MapReduce в AWS Elastic Mapreduce, задание не завершается успешно.mapper.py проходит через часть пути (я знаю это из-за записи в stderr по пути).Преобразователь прерывается ошибкой «Broken Pipe», которую я могу извлечь из системного журнала попытки выполнения задачи после ее сбоя:

java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.io.IOException: log:null
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=hadoop
HADOOP_USER=null
last Hadoop input: |null|
last tool output: |text/html    1|
Date: Mon Mar 26 07:19:05 UTC 2012
java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written

Вот mapper.py.Обратите внимание, что я пишу в stderr для предоставления отладочной информации:

#!/usr/bin/env python

import sys
from warc import ARCFile

def main():
    warc_file = ARCFile(fileobj=sys.stdin)
    for web_page in warc_file:
        print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging
        print '%s\t%s' % (web_page.header.content_type, 1)
    print >> sys.stderr, 'done' #For debugging
if __name__ == "__main__":
    main()

Вот что я получаю в stderr для попытки задания при запуске mapper.py:

text/html   1
text/html   1
text/html   1

По сути, цикл проходит 3 раза, а затем резко останавливается, без каких-либо ошибок Python.(Примечание: должно выводить тысячи строк).Даже неперехваченное исключение должно появиться в stderr.

Поскольку MapReduce прекрасно работает на моем локальном компьютере, я предполагаю, что это проблема того, как Hadoop работает с выводом, который я печатаю из mapper.py.Но я не знаю, в чем проблема.

Ответы [ 3 ]

9 голосов
/ 29 марта 2012

Ваш потоковый процесс (ваш скрипт на Python) преждевременно завершается.Это может быть связано с тем, что ввод данных завершен (например, интерпретация EOF) или проглоченным исключением.В любом случае, Hadoop пытается передать ваш сценарий через STDIN, но поскольку приложение завершено (и, следовательно, STDIN больше не является допустимым дескриптором файла), вы получаете ошибку BrokenPipe.Я бы предложил добавить трассировки stderr в ваш скрипт, чтобы увидеть, какая строка ввода вызывает проблему.Удачного кодирования,

-Geoff

6 голосов
/ 14 августа 2014

Это сказано выше, но позвольте мне попытаться уточнить - вы должны заблокировать стандартный ввод, даже если он вам не нужен!Это не то же самое, что каналы Linux, так что не позволяйте этому обмануть вас.Интуитивно понятно, что Streaming поддерживает ваш исполняемый файл, а затем говорит: «Подожди, пока я пойду за тобой».Если по какой-либо причине ваш исполняемый файл останавливается до того, как Streaming отправит вам 100% входных данных, Streaming говорит: «Эй, куда этот исполняемый файл ушел, когда я встал?!»Итак, вот некоторый код на Python, все, что он делает, это то, что делает cat, но вы заметите, что этот код не завершится, пока не будет обработан весь ввод, и это ключевой момент:

#!/usr/bin/python
import sys

while True:
    s = sys.stdin.readline()
    if not s:
        break
    sys.stdout.write(s)
1 голос
/ 29 марта 2012

У меня нет опыта работы с Hadoop на AWS, но у меня была такая же ошибка на обычном кластере hadoop - и в моем случае проблема заключалась в том, как я начал работать с python -mapper ./mapper.py -reducer ./reducer.py, а -mapper python mapper.py - нет.

Вы также, кажется, используете нестандартный пакет Python warc Вы отправляете необходимые файлы в streamjob?-cacheFiles или -cacheArchive может быть полезным.

...