Я пытаюсь выполнить программу Java для подсчета слов во входном файле, используя mapreduce в hadoop. Я использую Windows 10 и Eclipse IDE. Я получаю fileNotFoundException, когда редуктор начинает выполняться. Mapper выполняется полностью. Пожалуйста, помогите решить проблему. Застрял здесь на некоторое время.
public class CountMax {
public static class Map extends Mapper<LongWritable,Text,Text,IntWritable> {
public void map(LongWritable key, Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
value.set(tokenizer.nextToken());
context.write(value, new IntWritable(1));
}
System.out.println("In mapper");
}
}
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> {
public int maxCount = 0;
public String maxCountWord = "";
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException {
int sum=0;
for(IntWritable x: values)
sum+=x.get();
if(sum>maxCount){
maxCountWord = key.toString();
maxCount = sum;
System.out.println(sum);
System.out.println(key);
}
System.out.println(maxCountWord);
}
public void setup(Context context)throws IOException, InterruptedException {
System.out.println("in SETUP");
}
protected void cleanup(Context context)throws IOException,InterruptedException {
context.write( new Text(maxCountWord), new IntWritable(maxCount) );
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(CountMax.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path inp = new Path("C:/input.txt");
Path out = new Path("C:/output");
FileInputFormat.addInputPath(job, inp);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Вот что я получаю в консоли:
2019-10-07 00:07:39,461 INFO mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local2096667908_0001_m_000000_0
2019-10-07 00:07:39,461 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2019-10-07 00:07:39,463 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks
2019-10-07 00:07:39,463 INFO mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local2096667908_0001_r_000000_0
2019-10-07 00:07:39,472 INFO output.FileOutputCommitter (FileOutputCommitter.java:<init>(108)) - File Output Committer Algorithm version is 1
2019-10-07 00:07:39,472 INFO util.ProcfsBasedProcessTree (ProcfsBasedProcessTree.java:isAvailable(192)) - ProcfsBasedProcessTree currently is supported only on Linux.
2019-10-07 00:07:39,499 INFO mapred.Task (Task.java:initialize(614)) - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@7734a2ef
2019-10-07 00:07:39,501 INFO mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@18daa432
2019-10-07 00:07:39,509 INFO reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(205)) - MergerManager: memoryLimit=1314232704, maxSingleShuffleLimit=328558176, mergeThreshold=867393600, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2019-10-07 00:07:39,510 INFO reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local2096667908_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2019-10-07 00:07:39,528 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2019-10-07 00:07:39,532 WARN mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local2096667908_0001
java.lang.Exception: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in localfetcher#1
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in localfetcher#1
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: C:/tmp/hadoop-SahilJ%20PC/mapred/local/localRunner/SahilJ%20PC/jobcache/job_local2096667908_0001/attempt_local2096667908_0001_m_000000_0/output/file.out.index
at org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:200)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:768)
at org.apache.hadoop.io.SecureIOUtils.openFSDataInputStream(SecureIOUtils.java:155)
at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:71)
at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:62)
at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:57)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.copyMapOutput(LocalFetcher.java:124)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.doCopy(LocalFetcher.java:102)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.run(LocalFetcher.java:85)