Картограф, обрабатывающий разное количество строк - PullRequest
0 голосов
/ 22 марта 2019

Итак, сегодня я заметил странное поведение моего кода уменьшения карты.Потратил 3 часа, пытаясь разобраться, все равно ничего не получил.

Я пытаюсь найти ответы на 3 простых вопроса на основе этого набора данных:

The Academy Awards, 1927-2015

  • Сколько наград было дано в конкретный год?
  • Какой актер / актриса получил наибольшее количество наград в целом?
  • Какой фильм получил наибольшее количество наград на церемонии?

Я написал код сокращения карты и заметил, что картограф работает для разного числа строк для каждого кода.

Для строк Q1 - 3251, строк Q2 - 3251 и Q3 - 33!

Я не понимаю, почему это происходит.

Драйвер

public static void main(String[] args) throws Exception {

    String inputFilePath = "./database.csv";
    String outputFilePath = "./<BASED_ON_QUESTION>";

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "<BASED_ON_QUESTION>");
    job.setJarByClass(YearlyAwards.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(inputFilePath));

    try {
        File f = new File(outputFilePath);
        FileUtils.forceDelete(f);
    } catch (Exception e) {

    }

    FileOutputFormat.setOutputPath(job, new Path(outputFilePath));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Картограф

Q1

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    Integer count = 0;

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        String[] quoteLessVal = value.toString().split("\"");
        value = new Text(String.join("", quoteLessVal));
        String[] values = value.toString().split(",");
        String year = values[0];
        String win = values[3];
        count += 1;

        if (!win.equals("")) {
            context.write(new Text(year), new IntWritable(new Integer(win)));
        }
    }

    @Override
    protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        super.cleanup(context);
        System.out.println(count);
    }
}

Q1 Output

Q2

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    Integer count = 0;

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        count += 1;

        String[] quoteLessVal = value.toString().split("\"");
        value = new Text(String.join("", quoteLessVal));
        String[] values = value.toString().split(",");
        String name = values[4];
        String win = values[3];

        if (!win.equals("")) {
            context.write(new Text(name), new IntWritable(new Integer(win)));
        }
    }

    @Override
    protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        super.cleanup(context);
        System.out.println(count);
    }
}

Q2 Output

Q3

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    Integer count = 0;

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        count += 1;

        String[] quoteLessVal = value.toString().split("\"");
        value = new Text(String.join("", quoteLessVal));
        String[] values = value.toString().split(",");
        String name = values[5];
        String win = values[3];
        count += 1;

        if (!win.equals("")) {
            context.write(new Text(name), new IntWritable(new Integer(win)));
        }
    }

    @Override
    protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        super.cleanup(context);
        System.out.println(count);
    }
}

Q3 Output

Редуктор

(довольно стандартный для всех Qs)

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        Integer count = 0;

        for (IntWritable val : values) {
            count += 1;
        }
        System.out.println(key + " > " + count);

        context.write(key, new IntWritable(count));
    }
}

Я думаю, что выполнение кода по какой-то причине останавливается,поскольку я не получаю никакого выходного файла (part-r-00000) в выходной папке!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...