Я пытаюсь запустить MapReduce на oop. В файле /input/another.txt должен содержаться следующий текст: Давным-давно
Если я выполняю следующую команду:
hadoop jar /usr/local/Cellar/hadoop/3.2.1_1/libexec/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar -input /input -output /output -mapper "python mapper.py" -reducer "python reducer.py"
все начинает работать, как и должно, но чем Я получаю сообщение об ошибке:
2020-04-09 11:48:11,207 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-04-09 11:48:12,221 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-04-09 11:48:12,325 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-04-09 11:48:12,325 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2020-04-09 11:48:12,351 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2020-04-09 11:48:13,013 INFO mapred.FileInputFormat: Total input files to process : 1
2020-04-09 11:48:13,094 INFO mapreduce.JobSubmitter: number of splits:1
2020-04-09 11:48:13,312 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local188205184_0001
2020-04-09 11:48:13,312 INFO mapreduce.JobSubmitter: Executing with tokens: []
2020-04-09 11:48:13,573 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2020-04-09 11:48:13,575 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2020-04-09 11:48:13,578 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
2020-04-09 11:48:13,581 INFO mapreduce.Job: Running job: job_local188205184_0001
2020-04-09 11:48:13,584 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2020-04-09 11:48:13,584 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2020-04-09 11:48:13,641 INFO mapred.LocalJobRunner: Waiting for map tasks
2020-04-09 11:48:13,646 INFO mapred.LocalJobRunner: Starting task: attempt_local188205184_0001_m_000000_0
2020-04-09 11:48:13,711 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2
2020-04-09 11:48:13,711 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
2020-04-09 11:48:13,731 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
2020-04-09 11:48:13,732 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
2020-04-09 11:48:13,744 INFO mapred.MapTask: Processing split: hdfs://localhost:8020/input/another.txt:0+16
2020-04-09 11:48:13,796 INFO mapred.MapTask: numReduceTasks: 1
2020-04-09 11:48:13,927 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2020-04-09 11:48:13,927 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2020-04-09 11:48:13,927 INFO mapred.MapTask: soft limit at 83886080
2020-04-09 11:48:13,927 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2020-04-09 11:48:13,927 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2020-04-09 11:48:13,932 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2020-04-09 11:48:13,944 INFO streaming.PipeMapRed: PipeMapRed exec [/Users/..././python, /Users/ChrisChross/.../mapper.py]
2020-04-09 11:48:13,949 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2020-04-09 11:48:13,950 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2020-04-09 11:48:13,950 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2020-04-09 11:48:13,951 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2020-04-09 11:48:13,952 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2020-04-09 11:48:13,952 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2020-04-09 11:48:13,952 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2020-04-09 11:48:13,953 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2020-04-09 11:48:13,953 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
2020-04-09 11:48:13,954 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
/Users/ChrisChross/..././python: line 1: A: command not found
/Users/ChrisChross/..././python: line 2: long: command not found
/Users/ChrisChross/..././python: line 3: 1: command not found
real 0m0.003s
user 0m0.000s
sys 0m0.001s
/Users/ChrisChross/..././python: line 4: ago: command not found
/Users/ChrisChross/..././python: line 5: syntax error near unexpected token `in'
/Users/ChrisChross/..././python: line 5: `in 1'
2020-04-09 11:48:14,011 INFO streaming.PipeMapRed: MRErrorThread done
2020-04-09 11:48:14,015 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-04-09 11:48:14,135 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2020-04-09 11:48:14,136 WARN streaming.PipeMapRed: {}
java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:433)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:532)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,138 INFO streaming.PipeMapRed: PipeMapRed failed!
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,142 INFO mapred.LocalJobRunner: map task executor complete.
2020-04-09 11:48:14,157 WARN mapred.LocalJobRunner: job_local188205184_0001
java.lang.Exception: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
2020-04-09 11:48:14,591 INFO mapreduce.Job: Job job_local188205184_0001 running in uber mode : false
2020-04-09 11:48:14,592 INFO mapreduce.Job: map 0% reduce 0%
2020-04-09 11:48:14,594 INFO mapreduce.Job: Job job_local188205184_0001 failed with state FAILED due to: NA
2020-04-09 11:48:14,602 INFO mapreduce.Job: Counters: 0
2020-04-09 11:48:14,602 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
Для защиты данных я заменил путь к python сценариям после ChrisChross / на ... Это означает, что это
/Users/ChrisChross/.../mapper.py
Может кто-нибудь подсказать мне почему эта программа не работает должным образом? Я предполагаю, что есть что-то, что по какой-то причине не может оправдать строку 1: A: команда не найдена et c.
Мой mapper.py:
#!/usr/bin/env/ python3
import sys
for line in sys.stdin:
line = line.strip()
keys = line.split()
for key in keys:
value = 1
print('{0}\t{1}'.format(key, value))
Мой reducer.py:
#!/usr/bin/env/ python3
import sys
import time
string = {} #list of words
start = time.time()
for input_line in sys.stdin:
key, value = input_line.split()
value = int(value)
if key in string.keys(): #the word is in the list already
string[key] += value
else: #the word inserted into the list first time
string[key] = 1
for s in string:
print("{0}\t{1}".format(s, string[s]))
end = time.time()
print("{:.4f}\n".format(end-start))
Если я набираю jps в терминале, я получаю:
26435 NameNode
26676 SecondaryNameNode
26872 ResourceManager
26539 DataNode
27772 Jps
26972 NodeManager
Мой основной файл. xml настроен как:
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
<description>A base for other temporary directories</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:8020</value>
</property>
</configuration>
Мой файл пряжи. xml настроен как:
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
Мой hdfs-файл. xml настроен как:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/usr/local/Cellar/hadoop/hdfs/tmp/dfs/data/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/usr/local/Cellar/hadoop/hdfs/tmp/dfs/data/datanode</value>
</property>
</configuration>
Мой файл mapred. xml настроен как:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>
Если я запускаю его локально с
cat another.txt | python mapper.py | reducer.py
работает как задумано.
Спасибо за вашу помощь, ребята, и извините, если вы считаете, что это глупый вопрос или неправильный формат.
Помощь приветствуется.