MapReduce на oop запустить python сценарии не работает - PullRequest
0 голосов
/ 09 апреля 2020

Я пытаюсь запустить MapReduce на oop. В файле /input/another.txt должен содержаться следующий текст: Давным-давно

Если я выполняю следующую команду:

hadoop jar /usr/local/Cellar/hadoop/3.2.1_1/libexec/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar -input /input -output /output -mapper "python mapper.py" -reducer "python reducer.py"

все начинает работать, как и должно, но чем Я получаю сообщение об ошибке:

2020-04-09 11:48:11,207 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-04-09 11:48:12,221 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-04-09 11:48:12,325 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-04-09 11:48:12,325 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2020-04-09 11:48:12,351 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2020-04-09 11:48:13,013 INFO mapred.FileInputFormat: Total input files to process : 1
2020-04-09 11:48:13,094 INFO mapreduce.JobSubmitter: number of splits:1
2020-04-09 11:48:13,312 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local188205184_0001
2020-04-09 11:48:13,312 INFO mapreduce.JobSubmitter: Executing with tokens: []
2020-04-09 11:48:13,573 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2020-04-09 11:48:13,575 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2020-04-09 11:48:13,578 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
2020-04-09 11:48:13,581 INFO mapreduce.Job: Running job: job_local188205184_0001
2020-04-09 11:48:13,584 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2020-04-09 11:48:13,584 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2020-04-09 11:48:13,641 INFO mapred.LocalJobRunner: Waiting for map tasks
2020-04-09 11:48:13,646 INFO mapred.LocalJobRunner: Starting task: attempt_local188205184_0001_m_000000_0
2020-04-09 11:48:13,711 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2020-04-09 11:48:13,711 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2020-04-09 11:48:13,731 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2020-04-09 11:48:13,732 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
2020-04-09 11:48:13,744 INFO mapred.MapTask: Processing split: hdfs://localhost:8020/input/another.txt:0+16
2020-04-09 11:48:13,796 INFO mapred.MapTask: numReduceTasks: 1
2020-04-09 11:48:13,927 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2020-04-09 11:48:13,927 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2020-04-09 11:48:13,927 INFO mapred.MapTask: soft limit at 83886080
2020-04-09 11:48:13,927 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2020-04-09 11:48:13,927 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2020-04-09 11:48:13,932 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2020-04-09 11:48:13,944 INFO streaming.PipeMapRed: PipeMapRed exec [/Users/..././python, /Users/ChrisChross/.../mapper.py]
2020-04-09 11:48:13,949 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2020-04-09 11:48:13,950 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2020-04-09 11:48:13,950 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2020-04-09 11:48:13,952 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2020-04-09 11:48:13,952 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2020-04-09 11:48:13,952 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2020-04-09 11:48:13,953 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2020-04-09 11:48:13,953 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
2020-04-09 11:48:13,954 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
/Users/ChrisChross/..././python: line 1: A: command not found
/Users/ChrisChross/..././python: line 2: long: command not found
/Users/ChrisChross/..././python: line 3: 1: command not found

real    0m0.003s
user    0m0.000s
sys 0m0.001s
/Users/ChrisChross/..././python: line 4: ago: command not found
/Users/ChrisChross/..././python: line 5: syntax error near unexpected token `in'
/Users/ChrisChross/..././python: line 5: `in    1'
2020-04-09 11:48:14,011 INFO streaming.PipeMapRed: MRErrorThread done
2020-04-09 11:48:14,015 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-09 11:48:14,135 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2020-04-09 11:48:14,136 WARN streaming.PipeMapRed: {}
java.io.IOException: Stream closed
    at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433)
    at java.io.OutputStream.write(OutputStream.java:116)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:532)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,138 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,142 INFO mapred.LocalJobRunner: map task executor complete.
2020-04-09 11:48:14,157 WARN mapred.LocalJobRunner: job_local188205184_0001
java.lang.Exception: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,591 INFO mapreduce.Job: Job job_local188205184_0001 running in uber mode : false
2020-04-09 11:48:14,592 INFO mapreduce.Job:  map 0% reduce 0%
2020-04-09 11:48:14,594 INFO mapreduce.Job: Job job_local188205184_0001 failed with state FAILED due to: NA
2020-04-09 11:48:14,602 INFO mapreduce.Job: Counters: 0
2020-04-09 11:48:14,602 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

Для защиты данных я заменил путь к python сценариям после ChrisChross / на ... Это означает, что это

/Users/ChrisChross/.../mapper.py

Может кто-нибудь подсказать мне почему эта программа не работает должным образом? Я предполагаю, что есть что-то, что по какой-то причине не может оправдать строку 1: A: команда не найдена et c.

Мой mapper.py:


#!/usr/bin/env/ python3
import sys

for line in sys.stdin:
    line = line.strip()
    keys = line.split()
    for key in keys:
        value = 1
        print('{0}\t{1}'.format(key, value))

Мой reducer.py:

#!/usr/bin/env/ python3

import sys
import time
string = {} #list of words

start = time.time()

for input_line in sys.stdin:
    key, value = input_line.split()
    value = int(value)
    if key in string.keys(): #the word is in the list already
        string[key] += value
    else: #the word inserted into the list first time
        string[key] = 1
for s in string:
    print("{0}\t{1}".format(s, string[s]))


end = time.time()

print("{:.4f}\n".format(end-start))

Если я набираю jps в терминале, я получаю:

26435 NameNode
26676 SecondaryNameNode
26872 ResourceManager
26539 DataNode
27772 Jps
26972 NodeManager

Мой основной файл. xml настроен как:

<configuration>
  <property>
  <name>hadoop.tmp.dir</name>
    <value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
    <description>A base for other temporary directories</description>
  </property>

  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:8020</value>
  </property>
</configuration>

Мой файл пряжи. xml настроен как:

<configuration>
<!-- Site specific YARN configuration properties -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>

  <property>
      <name>yarn.nodemanager.env-whitelist</name>
         <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>
</configuration>

Мой hdfs-файл. xml настроен как:

<configuration>

    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>

    <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:/usr/local/Cellar/hadoop/hdfs/tmp/dfs/data/namenode</value>
    </property>

    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/Cellar/hadoop/hdfs/tmp/dfs/data/datanode</value>
    </property>

</configuration>

Мой файл mapred. xml настроен как:

<configuration>

  <property>
    <name>mapreduce.framework.name</name>
    <value>local</value>
  </property>

  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>

  <property>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
  </property>

</configuration>

Если я запускаю его локально с

cat another.txt | python mapper.py | reducer.py 

работает как задумано.

Спасибо за вашу помощь, ребята, и извините, если вы считаете, что это глупый вопрос или неправильный формат.

Помощь приветствуется.

...