Dataproc Hadoop MapReduce - не могу заставить его работать - PullRequest
2 голосов
/ 12 ноября 2019

Я в основном пытаюсь запустить свою первую подпрограмму Hadoop MapReduce, и я должен использовать Hadoop и MapReduce, поскольку я делаю это для проекта класса. Я хочу использовать Python для картографирования и редуктора, так как мне больше всего нравится этот язык, и он наиболее знаком моим коллегам. Я чувствовал, что самый простой способ настроить это через экземпляр Google DataProc, так что у меня это тоже работает. Я опишу, что я сделал и какие ресурсы использовал, но я относительно новичок в этом, и я могу что-то упустить.

Конфигурация Dataproc

Dataproc 1

Dataproc 2

Dataproc 3

И тогда я могуSSH в мой основной узел. У меня есть файлы mapper.py и reducer.py, хранящиеся в корзине Google Cloud Storage.

Код картографа и редуктора взят из этого сообщения в блоге Micheal Noll , модифицированного для работы с Python 3.

mapper.py:

#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

reducer.py

#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

Наконец, я ssh в мой мастерузел и затем проверьте мою версию Python:

hduser@data-604-m:~$ python
Python 3.7.3 (default, Mar 27 2019, 22:11:17) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

И я запускаю следующее (адаптировано из здесь ):

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

Что приводит к следующему:

hduser@data-604-m:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar     -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py     -map
per mapper.py     -reducer reducer.py     -input gs://data-604-hadoop/books/pg20417.txt     -output gs://data-604-hadoop/output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.9.2.jar] /tmp/streamjob4601880105330541890.jar tmpDir=null
19/11/12 02:10:46 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:47 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:49 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: number of splits:15
19/11/12 02:10:49 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher
.enabled
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573523684358_0002
19/11/12 02:10:50 INFO impl.YarnClientImpl: Submitted application application_1573523684358_0002
19/11/12 02:10:50 INFO mapreduce.Job: The url to track the job: http://data-604-m:8088/proxy/application_1573523684358_0002/
19/11/12 02:10:50 INFO mapreduce.Job: Running job: job_1573523684358_0002
19/11/12 02:10:58 INFO mapreduce.Job: Job job_1573523684358_0002 running in uber mode : false
19/11/12 02:10:58 INFO mapreduce.Job:  map 0% reduce 0%
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:19 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:20 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)

19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:28 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:30 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:37 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:39 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:40 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:48 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:49 INFO mapreduce.Job:  map 80% reduce 0%
19/11/12 02:11:50 INFO mapreduce.Job:  map 100% reduce 100%
19/11/12 02:11:50 INFO mapreduce.Job: Job job_1573523684358_0002 failed with state FAILED due to: Task failed task_1573523684358_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0
19/11/12 02:11:50 INFO mapreduce.Job: Counters: 14
        Job Counters 
                Failed map tasks=19
                Killed map tasks=14
                Killed reduce tasks=5
                Launched map tasks=22
                Other local map tasks=14
                Rack-local map tasks=8
                Total time spent by all maps in occupied slots (ms)=885928
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=221482
                Total vcore-milliseconds taken by all map tasks=221482
                Total megabyte-milliseconds taken by all map tasks=453595136
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
19/11/12 02:11:50 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

И, честно говоря, я понятия не имею, что делать в этот момент. Я потратил много времени на это, и я чувствую, что у кирпичной стены, потому что я не уверен, что не так.

Я также пытался:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper ./mapper.py \
    -reducer ./reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

С похожими результатами.

Я ценю любую помощь.

Обновление: Я перепробовал еще несколько вещей, но безуспешно. Я попытался перенести свои скрипты на Python в кластер Hadoop. Затем я проверил их с head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py, и они работают. В комментариях ниже я упоминаю, что я посмотрел на свой Шебанг и внес изменения, но они также не увенчались успехом.

1 Ответ

1 голос
/ 14 ноября 2019

Здесь происходит несколько разных вещей, но главное - это сводится к тому, что не обязательно предполагать, что системная среда каждой задачи отображения / редуктора (работающей как контейнеры YARN) обязательно будет иметьта же системная среда, что и вошедшая в систему оболочка. Многие элементы намеренно будут отличаться в большинстве случаев (например, пути к классам Java и т. Д.). Обычно с Java-программами MapReduce это работает по назначению, поскольку в итоге вы получите схожие переменные окружения и путь к классу между кодом драйвера, который выполняется по команде hadoop jar, и кодом исполнителя, который выполняется на рабочих узлах в контейнерах YARN. Потоковая передача Hadoop - это нечто странное, поскольку в обычном использовании Hadoop это не так много, как у первоклассного гражданина.

В любом случае, главное, что вас поражает в этом случае, это то, что ваш Python по умолчанию во время входа в системук кластеру относится дистрибутив Conda и версия Python 3.7, но версия Python по умолчанию в вашей среде YARN, которая порождает задачи маппера / редуктора, - это фактически Python 2.7. Это печальное следствие некоторых устаревших соображений совместимости в Dataproc. Вы можете увидеть это в действии, взломав mapper.py, чтобы он выполнял роль дампа необходимой вам информации об окружении, например, попробуйте выполнить следующие команды, пока SSH-в кластере Dataproc:

echo foo > foo.txt
hdfs dfs -mkdir hdfs:///foo
hdfs dfs -put foo.txt hdfs:///foo/foo.txt
echo '#!/bin/bash' > info.sh
echo 'which python' >> info.sh
echo 'python --version 2>&1' >> info.sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
     -Dmapred.reduce.tasks=0 \
     -Dmapred.map.tasks=1 \
     -files info.sh \
     -input hdfs:///foo \
     -output hdfs:///info-output \
     -mapper ./info.sh

Ваша локальная среда будет показывать другую версию Python по сравнению с той, что напечатана по адресу hdfs: /// info-output:

$ bash info.sh
/opt/conda/bin/python
Python 3.7.3
$ hdfs dfs -cat hdfs:///info-output/*
/usr/bin/python 
Python 2.7.15+  

Это означает, что вы можете либо сделать совместимый ваш картограф / редуктор Python 2.7, ИЛИВы можете явно указать /opt/conda/bin/python в своем шебанге. В моей настройке, которая копировала ваши настройки (Dataproc 1.4-ubuntu18 плюс действие инициализации jupyter.sh), у меня работало следующее:

#!/opt/conda/bin/python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

И редуктор:

#!/opt/conda/bin/python
"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

Однако, другойСледует иметь в виду, что действие инициализации jupyter.sh является устаревшим, и вместо этого вам следует использовать Dataproc Jupyter Component . В любом случае вам может понадобиться сначала выполнить шаги info.sh, которые я описал выше, чтобы определить соответствующую среду Python для использования в ваших mapper.py и reducer.py.

Например, ванильный Dataproc1.4-debian9 без действия инициализации jupyter.sh фактически будет иметь зарегистрированный по умолчанию Python /opt/conda/default/bin/python вместо /opt/conda/bin/python. И образ Dataproc 1.3-debian9 будет иметь /usr/bin/python по умолчанию с Python 2.7.

...