Здравствуйте, я пишу программу mapreduce для вычисления разницы в степени узла между двумя столбцами.
Мои данные выглядят так:
2,12,31,23,14,2
Я хочу посчитать вхождения ключа в col1 и вхождения ключа в col2 и принять разницу.Вывод должен выглядеть так:
1, -12,03,04,1
Я написал следующий код, но при его запуске вывод пуст.Может ли кто-нибудь помочь мне понять, что я делаю неправильно?
public static class DegreeMapper extends Mapper<LongWritable, Text, Text, ArrayPrimitiveWritable>
{
private IntWritable weight = new IntWritable();
private final static IntWritable one = new IntWritable(1);
private IntWritable node = new IntWritable();
public void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer tok = new StringTokenizer(value.toString(), ",");
Text col1 = new Text(tok.nextToken());
context.write(col1, new ArrayPrimitiveWritable(new int[]{1,0}));
// context.write(col1, toArray(1, 0));
Text col2 = new Text(tok.nextToken());
context.write(col2, new ArrayPrimitiveWritable( new int[]{0,1}));
// context.write(col2, toArray(0, 1) );
}
private ArrayPrimitiveWritable toArray(int v1, int v2){
return new ArrayPrimitiveWritable( new int[]{v1, v2} );
}
}
public static class DegreeReducer extends Reducer<Text, ArrayPrimitiveWritable, Text, Text>
{
private Text result = new Text();
public void reduce(Text key, Iterable<ArrayPrimitiveWritable> values, Context context) throws IOException, InterruptedException
{
Iterator<ArrayPrimitiveWritable> i = values.iterator();
int count = 0;
while (i.hasNext() ){
int[] counts = (int[])i.next().get();
count += counts[0];
count -= counts[1];
}
context.write(key, new Text("" + count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job1 = Job.getInstance(conf, "Q1");
/* TODO: Needs to be implemented */
job1.setJarByClass(Q1.class);
job1.setMapperClass(DegreeMapper.class);
job1.setReducerClass(DegreeReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
System.exit(job1.waitForCompletion(true) ? 0 : 1);
}