Mapreduce разница в степени узла - PullRequest
0 голосов
/ 18 октября 2018

Здравствуйте, я пишу программу mapreduce для вычисления разницы в степени узла между двумя столбцами.

Мои данные выглядят так:

2,12,31,23,14,2

Я хочу посчитать вхождения ключа в col1 и вхождения ключа в col2 и принять разницу.Вывод должен выглядеть так:

1, -12,03,04,1

Я написал следующий код, но при его запуске вывод пуст.Может ли кто-нибудь помочь мне понять, что я делаю неправильно?

public static class DegreeMapper extends Mapper<LongWritable, Text, Text, ArrayPrimitiveWritable> 
{
    private IntWritable weight = new IntWritable();
    private final static IntWritable one = new IntWritable(1);
    private IntWritable node = new IntWritable();

    public void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException 
    {
        StringTokenizer tok = new StringTokenizer(value.toString(), ",");

        Text col1 = new Text(tok.nextToken());
        context.write(col1, new ArrayPrimitiveWritable(new int[]{1,0}));
        // context.write(col1, toArray(1, 0));

        Text col2 = new Text(tok.nextToken());  
        context.write(col2, new ArrayPrimitiveWritable( new int[]{0,1}));
        // context.write(col2, toArray(0, 1) );
    }

    private ArrayPrimitiveWritable toArray(int v1, int v2){     
        return new ArrayPrimitiveWritable( new int[]{v1, v2} );
    } 
}  

public static class DegreeReducer extends Reducer<Text, ArrayPrimitiveWritable, Text, Text> 
{
    private Text result = new Text();
    public void reduce(Text key, Iterable<ArrayPrimitiveWritable> values, Context context) throws IOException, InterruptedException 
    {

        Iterator<ArrayPrimitiveWritable> i = values.iterator();
        int count = 0;
        while (i.hasNext() ){
          int[] counts = (int[])i.next().get();
          count += counts[0];
          count -= counts[1];
        }
        context.write(key, new Text("" + count));

    }
}


public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job1 = Job.getInstance(conf, "Q1");

    /* TODO: Needs to be implemented */
    job1.setJarByClass(Q1.class);
    job1.setMapperClass(DegreeMapper.class);
    job1.setReducerClass(DegreeReducer.class);

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path(args[1]));

    System.exit(job1.waitForCompletion(true) ? 0 : 1);

}
...