У меня есть набор данных о 1 млн. Патентов, и я хотел бы найти среднее значение "претензий" на патент в 4-х группах / категории. Например, до 1970, 1970-1979, 1980-1989, 1990-1999. «Патенты» имеют только патенты с GYEAR с 1975 по 1998 год.
Ниже показано, как выглядит мой набор данных в грубой идее ..
"ПАТЕНТ", "ГОД", "ГДАТ", "APPYEAR", "СТРАНА", "ПОСТАТ", " ASSIGNEE "," ASSCODE "," CLAIMS ",".
Ниже приведен мой результат, пока мне удается только найти средние "претензии" по патентам по каждому году, но я на самом деле пытаюсь найти среднее по годам Я думаю, что мой класс Редуктора неверен. Результат моего вывода пока ... PLS HELP!
public class new3 extends Configured implements Tool
{
//Mapper class which inherits super class mapper
//Mapper class takes four arguments Mapper<InputKey,InputValue,OutputKey,OutputValue>
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
Text key2 = new Text();// to contain GYEAR
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String sValue = value.toString();
String [] tokens = sValue.split(",",-1);
key2.set(tokens[1]);//GYEAR is extracted to key2
context.write(key2, value); //GYEAR is in key2, GYEAR and whole set of data is passed on
}
}
//Reducer class which inherits super class Reducer
//Reducer class takes four arguments Reducer<InputKey,InputValue,OutputKey,OutputValue>
public static class Reduce extends Reducer<Text, Text, Text, IntWritable>
{
public void reduce(Text key, Iterable<Text>values, Context context)
throws IOException, InterruptedException
{
int total = 0;
int counter = 0;
int average;
int claims = 0;
String st;
for(Text val : values)
{
String [] str = val.toString().split(",");
st = str[8];
try{
claims = Integer.parseInt(st);
}
catch(Exception e)
{
if(st==null)
{
st = "0";
claims = Integer.parseInt(st);
}
}
total = total + claims;
counter++;
}
average = (total/counter);
context.write(new Text(key), new IntWritable(average));
}
}
//Partitioner class which inherits Super class partitioner
public static class Partition extends Partitioner<Text, Text>
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
if(numReduceTasks==0)
{
return 0;
}
if (key.equals(new Text("1963")) || key.equals(new Text("1964")) ||
key.equals(new Text("1965")) || key.equals(new Text("1966")) ||
key.equals(new Text("1967")) || key.equals(new Text("1968")) ||
key.equals(new Text("1969")))
{
return 0;
}
else if (key.equals(new Text("1970")) || key.equals(new Text("1971")) ||
key.equals(new Text("1972")) || key.equals(new Text("1973")) ||
key.equals(new Text("1974")) || key.equals(new Text("1975")) ||
key.equals(new Text("1976")) || key.equals(new Text("1977")) ||
key.equals(new Text("1978")) || key.equals(new Text("1979")))
{
return 1;
}
else if (key.equals(new Text("1980")) || key.equals(new Text("1981")) ||
key.equals(new Text("1982")) || key.equals(new Text("1983")) ||
key.equals(new Text("1984")) || key.equals(new Text("1985")) ||
key.equals(new Text("1986")) || key.equals(new Text("1987")) ||
key.equals(new Text("1988")) || key.equals(new Text("1989")))
{
return 2;
}
else
{
return 3;
}
/*
String [] str = value.toString().split(",");
int year = Integer.parseInt(str[1]);
if(numReduceTasks == 0)
{
return 0;
}
if(year < 1970)
{
return 0;
}
else if(year >= 1970 && year <= 1979)
{
return 1 % numReduceTasks;
}
else if(year >= 1980 && year <= 1989)
{
return 2 % numReduceTasks;
}
else
{
return 3 % numReduceTasks;
}
*/
}
}
//The "run" method of ToolRunner
@Override
public int run(String[]args) throws Exception
{
Configuration conf = this.getConf(); //new Configuration();
Job job = Job.getInstance(conf, "FinalA1T3");
job.setJarByClass(new3.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(args[1]);
//Configuring the input/output path from the filesystem into the job
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(4);
//Deleting the output path automatically from the HDFS so that we dont have to delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath, true);
//Exiting the job only if the flag value becomes false
return job.waitForCompletion(true) ? 0 : 1;
}
// the main driver method
public static void main(String[]args) throws Exception
{
int exitCode = ToolRunner.run(new Configuration(), new new3(), args);
System.exit(exitCode);
}
}