Я пытаюсь решить проблему со списком перевернутых слов (для каждого слова вывод - это список имен файлов, содержащих это слово) в потоковой передаче Hadoop.Входные данные - это имя каталога, содержащего текстовые файлы.Я написал маппер и редуктор на Python, и они отлично работают при попытке с Unix трубопроводов.Однако при выполнении с использованием потоковой команды Hadoop коды выполняются, но в итоге задание не выполняется.Я подозреваю, что это что-то в коде Mapper, но не могу точно знать, в чем проблема.
Я новичок (поэтому, извините, если я ничего не понял правильно), использую обучение Cloudera наVMware Fusion.Мои исполняемые файлы Mapper и Reducer .py помещены в домашний каталог как в локальной системе, так и в hdfs.У меня есть каталог "Шекспир" на hdfs.Приведенная ниже команда unix pipe работает нормально.
эхо Шекспир |./InvertedMapper.py |сортировать |./InvertedReducer.py
Однако потоковая передача Haddop не выполняется.
Jadoop Jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming*.jar -input shakespeare -output InvertedList -mapper InvertedMapper.py -reducer InvertedReducer.py -file InvertedMapper.py -file InvertedReducer.py
#MAPPER CODE
#!/usr/bin/env python
import sys
import os
class Mapper(object):
def __init__(self, stream, sep='\t'):
self.stream=stream
self.sep=sep
def __iter__(self):
os.chdir(self.stream.read().strip())
files = [os.path.abspath(f) for f in os.listdir(".")]
for file in files:
yield file
def emit(self, key, value):
sys.stdout.write("{0}{1}{2}\n".format(key,self.sep,value))
def map(self):
for file in self:
with open(file) as infile:
name = file.split("/")[-1].split(".")[0]
words = infile.read().strip().split()
for word in words:
self.emit(word,name)
if __name__ == "__main__":
cwd = os.getcwd()
mapper = Mapper(sys.stdin)
mapper.map()
os.chdir(cwd)
#REDUCER CODE
#!/usr/bin/env python
import sys
from itertools import groupby
from operator import itemgetter
class Reducer(object):
def __init__(self, stream, sep="\t"):
self.stream = stream
self.sep = sep
def __iter__(self):
for line in self.stream:
try:
parts = line.strip().split(self.sep)
yield parts[0], parts[1]
except:
continue
def emit(self, key, value):
sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))
def reduce(self):
for key, group in groupby(self, itemgetter(0)):
values = []
for item in group:
values.append(item[1])
values = set(values)
values = list(values)
self.emit(key, values)
if __name__ == "__main__":
reducer = Reducer(sys.stdin)
reducer.reduce()
Вывод при запуске команды Hadoop приведен ниже.
packageJobJar: [InvertedMapper1.py, /tmp/hadoop-training/hadoop-unjar281431668511629942/] [] /tmp/streamjob679048425003800890.jar tmpDir=null
19/02/17 00:22:19 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
19/02/17 00:22:19 INFO mapred.FileInputFormat: Total input paths to process : 5
19/02/17 00:22:20 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-hdfs/cache/training/mapred/local]
19/02/17 00:22:20 INFO streaming.StreamJob: Running job: job_201902041621_0051
19/02/17 00:22:20 INFO streaming.StreamJob: To kill this job, run:
19/02/17 00:22:20 INFO streaming.StreamJob: UNDEF/bin/hadoop job -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201902041621_0051
19/02/17 00:22:20 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201902041621_0051
19/02/17 00:22:21 INFO streaming.StreamJob: map 0% reduce 0%
19/02/17 00:22:34 INFO streaming.StreamJob: map 40% reduce 0%
19/02/17 00:22:39 INFO streaming.StreamJob: map 0% reduce 0%
19/02/17 00:22:50 INFO streaming.StreamJob: map 40% reduce 0%
19/02/17 00:22:53 INFO streaming.StreamJob: map 0% reduce 0%
19/02/17 00:23:03 INFO streaming.StreamJob: map 40% reduce 0%
19/02/17 00:23:06 INFO streaming.StreamJob: map 20% reduce 0%
19/02/17 00:23:07 INFO streaming.StreamJob: map 0% reduce 0%
19/02/17 00:23:16 INFO streaming.StreamJob: map 20% reduce 0%
19/02/17 00:23:17 INFO streaming.StreamJob: map 40% reduce 0%
19/02/17 00:23:19 INFO streaming.StreamJob: map 20% reduce 0%
19/02/17 00:23:21 INFO streaming.StreamJob: map 100% reduce 100%
19/02/17 00:23:21 INFO streaming.StreamJob: To kill this job, run:
19/02/17 00:23:21 INFO streaming.StreamJob: UNDEF/bin/hadoop job -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201902041621_0051
19/02/17 00:23:21 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201902041621_0051
19/02/17 00:23:21 ERROR streaming.StreamJob: Job not successful. Error: NA
19/02/17 00:23:21 INFO streaming.StreamJob: killJob...
Streaming Command Failed!