Hadoop - Соединение с несколькими входами, вероятно, пропускает Reducer - PullRequest
0 голосов
/ 27 января 2012

Итак, я хочу выполнить сокращение стороны соединения с MR.(Нет Hive или что-то еще, я экспериментирую с ванильным Hadoop Atm).

У меня есть 2 входных файла, сначала идет так:
12 13
12 15
12 16
12 23

второй - просто 12 1000.

Поэтому я назначаю каждый файл отдельному преобразователю, который фактически помечает каждую пару значений ключа 0 или 1 в зависимости от исходного файла.,И это хорошо работает.Как я могу сказать?Я получаю MapOutput как ожидалось:

|ключ || значение |
12 0 1000
12 1 13
12 1 15
12 1 16 и т. д.

Разделы My Partitioner на основе первой части ключа (т. е. 12).Редуктор должен включиться ключом.Тем не менее, кажется, что работа пропускает шаг сокращения.

Интересно, что-то не так с моим драйвером?

Мой код (Hadoop v0.22, но те же результаты с 0.20.2 с дополнительнымиlibs из багажника):

Mappers

public static class JoinDegreeListMapper extends
        Mapper<Text, Text, TextPair, Text> {
    public void map(Text node, Text degree, Context context)
            throws IOException, InterruptedException {

        context.write(new TextPair(node.toString(), "0"), degree);

    }
}

public static class JoinEdgeListMapper extends
        Mapper<Text, Text, TextPair, Text> {
    public void map(Text firstNode, Text secondNode, Context context)
            throws IOException, InterruptedException {

        context.write(new TextPair(firstNode.toString(), "1"), secondNode);

    }
}

Редуктор

public static class JoinOnFirstReducer extends
        Reducer<TextPair, Text, Text, Text> {
    public void reduce(TextPair key, Iterator<Text> values, Context context)
            throws IOException, InterruptedException {

        context.progress();
        Text nodeDegree = new Text(values.next());
        while (values.hasNext()) {
            Text secondNode = values.next();
            Text outValue = new Text(nodeDegree.toString() + "\t"
                    + secondNode.toString());
            context.write(key.getFirst(), outValue);
        }
    }
}

Разделитель

public static class JoinOnFirstPartitioner extends
        Partitioner<TextPair, Text> {

    @Override
    public int getPartition(TextPair key, Text Value, int numOfPartitions) {
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numOfPartitions;
    }
}

Драйвер

public int run(String[] args) throws Exception {


    Path edgeListPath = new Path(args[0]);
    Path nodeListPath = new Path(args[1]);
    Path outputPath = new Path(args[2]);

    Configuration conf = getConf();

    Job job = new Job(conf);
    job.setJarByClass(JoinOnFirstNode.class);
    job.setJobName("Tag first node with degree");

    job.setPartitionerClass(JoinOnFirstPartitioner.class);
    job.setGroupingComparatorClass(TextPair.FirstComparator.class);
    //job.setSortComparatorClass(TextPair.FirstComparator.class);
    job.setReducerClass(JoinOnFirstReducer.class);

    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);


    MultipleInputs.addInputPath(job, edgeListPath, EdgeInputFormat.class,
            JoinEdgeListMapper.class);
    MultipleInputs.addInputPath(job, nodeListPath, EdgeInputFormat.class,
            JoinDegreeListMapper.class);

            FileOutputFormat.setOutputPath(job, outputPath);


    return job.waitForCompletion(true) ? 0 : 1;

}

1 Ответ

0 голосов
/ 28 января 2012

В моей функции сокращения был Iterator <> вместо Iterable, поэтому задание было пропущено до Identity Reducer.
Не могу поверить, что я это упустил.Ошибка Noob.

И ответ получен из этого Q / A При первом использовании Hadoop задание MapReduce не запускается Reduce Phase

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...