пример подсчета mapreduce - PullRequest
8 голосов
/ 29 мая 2011

Мой вопрос о mapreduce programming in java.

Предположим, у меня есть пример WordCount.java, стандартный mapreduce program.Я хочу, чтобы функция карты собирала некоторую информацию и возвращалась к картам функций уменьшения, сформированным как: <slaveNode_id,some_info_collected>,

, так что I can know what slave node collected what data .. Любая идея, как ??

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }
}

Спасибо !!

Ответы [ 2 ]

5 голосов
/ 29 мая 2011

То, что вы просите, - это сообщить приложению (вашей карте) о инфраструктуре, на которой оно работало.

В целом ответ таков: вашему приложению эта информация не нужна. Каждый вызов Mapper и каждый вызов Reducer могут выполняться на другом узле или на одном и том же узле. Прелесть MapReduce в том, что результат одинаков для вашего приложения: это не имеет значения.

Как следствие, API не имеет функций для поддержки вашего запроса.

Получайте удовольствие от изучения Hadoop:)


P.S. Единственный способ, которым я могу придумать (что неприятно, если не сказать больше), состоит в том, что вы включаете какой-то системный вызов в Mapper и спрашивает базовую ОС о ее имени / свойствах / и т.д. Такая конструкция сделает ваше приложение очень непереносимым; то есть он не будет работать на Hadoop в Windows или Amazon.

1 голос
/ 29 мая 2011

Wordcount - неправильный пример для вас.Вы хотите просто объединить всю информацию вместе.Это инвертирует вещи в wordcount.

По сути, вы просто излучаете свой slaveNode_id как IntWritable (если это возможно), а информацию как Text.

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,IntWritable, Text> {
    private Text word = new Text();

  public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
      word.set(tokenizer.nextToken());
      // you have to split your data here: ID and value
      IntWritable id = new IntWritable(YOUR_ID_HERE);

      output.collect(id, word);
    }
  }
}

ИРедуктор будет идти так же:

 public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text,IntWritable, Text> {
  public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable,Text> output, Reporter reporter) throws IOException {

      // now you have all the values for a slaveID as key. Do whatever you like with that...
      for(Text value : values)
         output.collect(key, value)
  }
}
...