Как сгруппировать набор данных в mapreduce - PullRequest
0 голосов
/ 23 апреля 2020

У меня есть набор данных о 1 млн. Патентов, и я хотел бы найти среднее значение "претензий" на патент в 4-х группах / категории. Например, до 1970, 1970-1979, 1980-1989, 1990-1999. «Патенты» имеют только патенты с GYEAR с 1975 по 1998 год.

Ниже показано, как выглядит мой набор данных в грубой идее ..

"ПАТЕНТ", "ГОД", "ГДАТ", "APPYEAR", "СТРАНА", "ПОСТАТ", " ASSIGNEE "," ASSCODE "," CLAIMS ",".

Ниже приведен мой результат, пока мне удается только найти средние "претензии" по патентам по каждому году, но я на самом деле пытаюсь найти среднее по годам Я думаю, что мой класс Редуктора неверен. Результат моего вывода пока ... PLS HELP!

public class new3 extends Configured implements Tool
{
//Mapper class which inherits super class mapper
//Mapper class takes four arguments Mapper<InputKey,InputValue,OutputKey,OutputValue>
public static class Map extends Mapper<LongWritable, Text,  Text, Text>
{
    Text key2 = new Text();// to contain GYEAR

    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
        String sValue = value.toString();
        String [] tokens = sValue.split(",",-1);

        key2.set(tokens[1]);//GYEAR is extracted to key2

        context.write(key2, value); //GYEAR is in key2, GYEAR and whole set of data is passed on
    }
}


//Reducer class which inherits super class Reducer
//Reducer class takes four arguments Reducer<InputKey,InputValue,OutputKey,OutputValue>
public static class Reduce extends Reducer<Text, Text, Text, IntWritable>
{

    public void reduce(Text key, Iterable<Text>values, Context context)
    throws IOException, InterruptedException
    {
        int total = 0;
        int counter = 0;
        int average;
        int claims = 0;
        String st;

        for(Text val : values)
        {   
            String [] str = val.toString().split(",");
            st = str[8];
            try{
                claims = Integer.parseInt(st);
            }
            catch(Exception e)
            {
                if(st==null)
                {
                    st = "0";
                    claims = Integer.parseInt(st);
                }
            }               

            total = total + claims;
            counter++;
        }



        average = (total/counter);
        context.write(new Text(key), new IntWritable(average));
    }
}

//Partitioner class which inherits Super class partitioner
public static class Partition extends Partitioner<Text, Text>
{
    @Override
    public int getPartition(Text key, Text value, int numReduceTasks)
    {

        if(numReduceTasks==0)
        {
            return 0;
        }
        if (key.equals(new Text("1963")) || key.equals(new Text("1964")) || 
            key.equals(new Text("1965")) || key.equals(new Text("1966")) || 
            key.equals(new Text("1967")) || key.equals(new Text("1968")) || 
            key.equals(new Text("1969")))
        {
            return 0;
        }

        else if (key.equals(new Text("1970")) || key.equals(new Text("1971")) || 
                 key.equals(new Text("1972")) || key.equals(new Text("1973")) || 
                 key.equals(new Text("1974")) || key.equals(new Text("1975")) || 
                 key.equals(new Text("1976")) || key.equals(new Text("1977")) || 
                 key.equals(new Text("1978")) || key.equals(new Text("1979")))
        {
            return 1;
        }

        else if (key.equals(new Text("1980")) || key.equals(new Text("1981")) || 
                 key.equals(new Text("1982")) || key.equals(new Text("1983")) || 
                 key.equals(new Text("1984")) || key.equals(new Text("1985")) || 
                 key.equals(new Text("1986")) || key.equals(new Text("1987")) || 
                 key.equals(new Text("1988")) || key.equals(new Text("1989")))
        {
            return 2;
        }

        else
        {
            return 3;
        }

        /*
        String [] str = value.toString().split(",");
        int year = Integer.parseInt(str[1]);

        if(numReduceTasks == 0)
        {
            return 0;
        }
        if(year < 1970)
        {
            return 0;
        }
        else if(year >= 1970 && year <= 1979)
        {
            return 1 % numReduceTasks;
        }
        else if(year >= 1980 && year <= 1989)
        {
            return 2 % numReduceTasks;
        }
        else
        {
            return 3 % numReduceTasks;
        }
        */
    }

}

//The "run" method of ToolRunner
@Override
public int run(String[]args) throws Exception
{
    Configuration conf = this.getConf(); //new Configuration();

    Job job = Job.getInstance(conf, "FinalA1T3");

    job.setJarByClass(new3.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setPartitionerClass(Partition.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    Path outputPath = new Path(args[1]);

    //Configuring the input/output path from the filesystem into the job
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setNumReduceTasks(4);

    //Deleting the output path automatically from the HDFS so that we dont have to delete it explicitly
    outputPath.getFileSystem(conf).delete(outputPath, true);

    //Exiting the job only if the flag value becomes false
    return job.waitForCompletion(true) ? 0 : 1;
}

// the main driver method
public static void main(String[]args) throws Exception
{       
    int exitCode = ToolRunner.run(new Configuration(), new new3(), args);
    System.exit(exitCode);
}

}

...