Я пытаюсь запустить простой пример с использованием двоичного исполняемого файла и кэшированного архива, и он, похоже, не работает:
В примере, который я пытаюсь запустить, есть преобразователь, который генерирует три случайных двойных числа иключ и редуктор будут усреднять эти три числа вместе и записывать среднее значение.Очень простые вещи.Я написал простой EXE-файл в c и генерирую случайные числа:
#include <cstdio>
#include <stdlib.h>
#include <time.h>
int main(int argc, char*argv[]) {
srand ( time(NULL) );
int rand1 = rand() % 10 + 1;
int rand2 = rand() % 10 + 1;
int rand3 = rand() % 10 + 1;
printf("%s, %f, %f, %f", argv[1], (float)rand1/5, (float)rand2/5, (float)rand3/5);
return 0;
}
, поэтому, если я позвоню ./a.out [ключ]
я увижу ключ
, random1, random2, random3
im с использованием потоковой передачи Python, и вот мой картограф, написанный на Python:
#!/usr/bin/python
import os
import sys
import random
import shlex, subprocess
from subprocess import PIPE
from misc import *
for line in sys.stdin:
line = line.strip()
sline = line.split(',')
# parse out the au and cost from the line ....
key = int( sline[0] )
au = int( sline[1])
cost = float( sline[2] )
command = "./a.out %d" % ( key )
cli_parts = shlex.split(command)
mp = subprocess.Popen(cli_parts, stdin=PIPE, stderr=PIPE,
stdout=PIPE)
print mp.communicate()[0].strip('\r\n')
вот редуктор, который будет просто делатьусреднение:
#!/usr/bin/python
import os
import sys
import math
import re
from misc import *
for line in sys.stdin:
line = line.strip()
m = re.match("(\w+),\s+(.*),\s+(.*),\s+(.*)", line)
if m:
average = (float(m.groups(0)[1]) + float(m.groups(0)[2]) +
float(m.groups(0)[3])) / 3
print "key: %d average: %f" % ( int(m.groups(0)[0]), average )
else:
print "not found"
#f.close()
поэтому после прочтения документации мне кажется, что мне нужно скомпилировать бинарный файл и tar.gz-it
1) tar cvaf a.out.tar.gz a.out
Теперь я должен быть в состоянии передать это датоданным через параметр - cacheArchive, и все должно работать нормально.Вот моя команда Hadoop:
Jadoop Jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop- streaming-0.20.2 + 737.jar \ -numReduceTasks 1 \ -mapper mapper1.py \ -file mapper1.py \ -reducer reducer1.py \ -file reducer1.py \ -file misc.py \ -cacheArchive a.out.tar.gz \ -input input / * \ -output testsvmoutput \ -verbose
Излишне говорить, что это не работает, и похоже, потому что маппер не генерирует данные.
Я подтвердил, что мой код работает, протестировав его в командной строке:
cat input /svminput1.txt |python mapper1.py |сортировать |python reducer1.py
Я хотел бы, чтобы кто-нибудь объяснил, почему это не работает, как передача exe-команды с помощью команды cacheArchive работает на датоделях, и / или как отладить это, поскольку сообщения об ошибках выходят изHTML-панель Cloudera не очень полезна.
Спасибо
Вот ошибка, которую я вижу:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:317)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)