Итак, я хочу выполнить сокращение стороны соединения с MR.(Нет Hive или что-то еще, я экспериментирую с ванильным Hadoop Atm).
У меня есть 2 входных файла, сначала идет так:
12 13
12 15
12 16
12 23
второй - просто 12 1000.
Поэтому я назначаю каждый файл отдельному преобразователю, который фактически помечает каждую пару значений ключа 0 или 1 в зависимости от исходного файла.,И это хорошо работает.Как я могу сказать?Я получаю MapOutput как ожидалось:
|ключ || значение |
12 0 1000
12 1 13
12 1 15
12 1 16 и т. д.
Разделы My Partitioner на основе первой части ключа (т. е. 12).Редуктор должен включиться ключом.Тем не менее, кажется, что работа пропускает шаг сокращения.
Интересно, что-то не так с моим драйвером?
Мой код (Hadoop v0.22, но те же результаты с 0.20.2 с дополнительнымиlibs из багажника):
Mappers
public static class JoinDegreeListMapper extends
Mapper<Text, Text, TextPair, Text> {
public void map(Text node, Text degree, Context context)
throws IOException, InterruptedException {
context.write(new TextPair(node.toString(), "0"), degree);
}
}
public static class JoinEdgeListMapper extends
Mapper<Text, Text, TextPair, Text> {
public void map(Text firstNode, Text secondNode, Context context)
throws IOException, InterruptedException {
context.write(new TextPair(firstNode.toString(), "1"), secondNode);
}
}
Редуктор
public static class JoinOnFirstReducer extends
Reducer<TextPair, Text, Text, Text> {
public void reduce(TextPair key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
context.progress();
Text nodeDegree = new Text(values.next());
while (values.hasNext()) {
Text secondNode = values.next();
Text outValue = new Text(nodeDegree.toString() + "\t"
+ secondNode.toString());
context.write(key.getFirst(), outValue);
}
}
}
Разделитель
public static class JoinOnFirstPartitioner extends
Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text Value, int numOfPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numOfPartitions;
}
}
Драйвер
public int run(String[] args) throws Exception {
Path edgeListPath = new Path(args[0]);
Path nodeListPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
Configuration conf = getConf();
Job job = new Job(conf);
job.setJarByClass(JoinOnFirstNode.class);
job.setJobName("Tag first node with degree");
job.setPartitionerClass(JoinOnFirstPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
//job.setSortComparatorClass(TextPair.FirstComparator.class);
job.setReducerClass(JoinOnFirstReducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, edgeListPath, EdgeInputFormat.class,
JoinEdgeListMapper.class);
MultipleInputs.addInputPath(job, nodeListPath, EdgeInputFormat.class,
JoinDegreeListMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}