Нахождение максимального значения и возвращение строки в карте уменьшает - PullRequest
0 голосов
/ 15 октября 2019

У меня есть файл со следующим содержанием: Город 1 Город 2 Количество поездок

330 11 10

10 20 12

11 130 19

11 200 80

330 70 10

, и у меня возникают проблемы с получением строк с максимальным количеством поездок для каждого города 1. Я сопоставил ключи для города 1 изначения должны быть строками City2, # of Trips, и я пытаюсь закодировать свое уменьшение так, чтобы результат выглядел следующим образом: 330 11,10

10 20,12

11 200, 80

Вот мой код:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Q1 {
    public static class TokenizerMapper
           extends Mapper<Object, Text, Text, Text>{

        private Text src = new Text();
        private Text trgt_wght = new Text();
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          String oneLine = value.toString();
          String[] parts = oneLine.split("\t");
          src.set(parts[0]);
          String join = parts[1]+","+parts[2];
          trgt_wght.set(join);
          context.write(src,trgt_wght);
        }
      }
public static class IntSumReducer
       extends Reducer<Text,Text,Text,Text> {
    private Text result = new Text();

    public void reduce(Text key, Text values,
                       Context context
                       ) throws IOException, InterruptedException {
      int max = 0;
      String trgt_wght2 = values.toString();
      String[] parts2 = trgt_wght2.split(",\t");
      for (String val : parts2) {
        if(max < Integer.parseInt(String.valueOf(parts2[0]))){
                max = Integer.parseInt(String.valueOf(parts2[0]));
            result.set(parts2[0]+","+parts2[1]);
            context.write(key, result);
        }
    }
  }
}  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Q1");

    /* TODO: Needs to be implemented */
    job.setJarByClass(Q1.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Сейчас он просто выводит вот так: 330 11,10

10 20,12

11130,19

11 200,80

330 70,10

...