hadoop java: как узнать, что конец ввода редуктора достигнут? - PullRequest
0 голосов
/ 09 февраля 2012

Мой редуктор выглядит следующим образом

public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {

        List<Text> allRecords = new ArrayList<Text>();

        public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {

                allRecords.add(values.next());
                Text[] outputValues = new Text[7];
                for (int i=1; i>=7; i++) {
                    outputValues[i-1] = allRecords.get(allRecords.size() - i);
                }
        }
    }
  • У меня только один редуктор.
  • Мне нужно собрать первые 7 записей, когда редуктор завершит работу.

Вопрос

  • Как узнать, что получен конец ввода редуктора
    Спасибо

Ответы [ 4 ]

2 голосов
/ 09 февраля 2012

Я думаю, вы неправильно понимаете назначение ключа, который вы записываете для каждого сопоставленного значения. Цель ключа - сгруппировать элементы в конкретные вызовы редуктора. Поскольку вы хотите, чтобы все значения в вашем коде учитывались сразу, вам нужно использовать только один ключ, как показано ниже:

public class MyMapper<K extends WritableComparable, V extends Writable> 
     extends MapReduceBase implements Mapper<IntWriteable, WhateverTheInputTypeWas,
                                             IntWriteable, Text> {
  public void map(IntWriteable key, WhateverTheInputTypeWas val,
                  OutputCollector<IntWriteable, Text> output, Reporter reporter)

    // do some processing
    output.collect(new IntWriteable(1), ...);
  }
}

Инфраструктура автоматически собирает все значения для определенного ключа и представляет их одним вызовом reduce. Вот почему reduce принимает Iterator значений, а не только одно значение. Все, что вам нужно сделать, это выполнить итерацию по всему итератору, и когда hasNext() возвращает false, именно тогда вы достигли конца ввода функции reduce для этого конкретного ключа.

public static class Reduce extends MapReduceBase 
                           implements Reducer<IntWritable, Text, 
                                              IntWritable, Text> {

  public void reduce(IntWritable key, Iterator<Text> values,
                     OutputCollector<IntWritable, Text> output,
                     Reporter reporter) throws IOException {

    int i=0
    Text[] outputValues = new Text[7];
    while (values.hasNext() && i < 7) {
      outputValues[i++] = values.next();
    }
    // now output the contents of outputValues to the OutputCollector
  }
}

Если вам нужны другие ключи для некоторых других вычислений, которые вы выполняете в редукторе, просто выведите их также из маппера и получите специальное значение часового (возможно, -1, в зависимости от того, что означают ваши ключи), которое получает вывод для каждого сопоставленного элемента данных, а затем просто запускайте эту специальную логику, только когда ключ равен значению часового.

0 голосов
/ 12 апреля 2013

Уже поздно, но это может быть полезно для тех, кто ищет тот же вопрос.

Откройте файл и запишите его в тот файл, который вы хотите увидеть.

Например, чтобы увидеть, какой сотрудник Reduce выполняет какую часть вашего кода, вы можете сделать следующее:

class myReducer extends Reducer{
     File f;
     void setup(){
          // open your file here
     }
     void reduce(){
          //write key/value or whatever whatever you want to see here
          //and your reduce method
     }
}

Таким образом, вы можете легко увидеть, в чем ваша ошибка и т.д ...

0 голосов
/ 09 февраля 2012

Если я правильно понял Ваш вопрос, вам нужно уведомление, когда все данные обработаны редуктором.
Одна из таких известных мне точек зрения - метод close в формате вывода:
public void close (контекст TaskAttemptContext)
Вы можете переопределить этот метод в своем формате вывода.Он будет вызван после того, как связанный редуктор завершит свою работу.

0 голосов
/ 09 февраля 2012

Вы должны пройти через:

for (Text t : values) {

}

Или:

while (values.hasNext()) {
   Text t = values.next()
}
...