Один вариант, включающий одно задание MR: при условии, что исходный набор данных выглядит как [узел1, узел2]:
-mapper считывает исходный набор данных и испускает тройки [узел1, узел2, выход] и [узел2, узел1,in] для каждой строки
-reducer получает тройки из маппера в виде [ключ, узел, метка], вычисляет выходную степень и степень, подсчитывая метки «out» и метки «in» отдельно для каждой клавиши и выводит ихв форме [key, indegree, outdegree]
Реализация будет выглядеть примерно так, как показано ниже (при условии, что node1
и node2
в вашем наборе данных разделены пробелом, а также при условии, что набор данных содержит только отдельные пары):
Mapper:
public class YourMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] line_spl = line.split(" ");
String node1 = line_spl[0];
String node2 = line_spl[1];
Text node1_txt = new Text(node1);
Text node2_txt = new Text(node2);
Text emit_out = new Text("out_" + node2);
Text emit_in = new Text("in_" + node1);
output.collect(node1_txt, emit_out);
output.collect(node2_txt, emit_in );
}//end map function
}//end mapper class
Редуктор:
public class YourReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int count_outs = 0;
int count_ins = 0;
while (values.hasNext()) {
Text value = (Text) values.next();
String value_str = value.toString();
if(value_str.startsWith("out_"))
count_outs++;
else
if(value_str.startsWith("in_"))
count_ins++;
}
Text out = new Text(count_ins + " " + count_outs);
output.collect(key, out);
}//end reduce function
}//end reducer class