У меня есть простое приложение hadoop, которое получает один CSV-файл, затем разделяет запись на «,», а затем подсчитывает первые элементы.
Ниже приведен мой код.
package com.bluedolphin;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJob extends Configured implements Tool {
private final static LongWritable one = new LongWritable(1);
public static class MapClass extends Mapper<Object, Text, Text, LongWritable> {
private Text word = new Text();
public void map(Object key,
Text value,
OutputCollector<Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
word.set(citation[0]);
output.collect(word, one);
}
}
public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(
Text key,
Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new LongWritable(sum));
}
}
public static class Combiner extends Reducer<Text, IntWritable, Text, LongWritable> {
public void reduce(
Text key,
Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output,
Reporter reporter) throws IOException, InterruptedException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new LongWritable(sum));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "MyJob");
job.setJarByClass(MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
// job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
// job.setInputFormatClass(KeyValueInputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(KeyValueOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String args[]) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
Это ошибка:
11/12/16 22:16:58 INFO mapred.JobClient: Task Id : attempt_201112161948_0005_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1013)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)