Я думаю, вы неправильно понимаете назначение ключа, который вы записываете для каждого сопоставленного значения. Цель ключа - сгруппировать элементы в конкретные вызовы редуктора. Поскольку вы хотите, чтобы все значения в вашем коде учитывались сразу, вам нужно использовать только один ключ, как показано ниже:
public class MyMapper<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<IntWriteable, WhateverTheInputTypeWas,
IntWriteable, Text> {
public void map(IntWriteable key, WhateverTheInputTypeWas val,
OutputCollector<IntWriteable, Text> output, Reporter reporter)
// do some processing
output.collect(new IntWriteable(1), ...);
}
}
Инфраструктура автоматически собирает все значения для определенного ключа и представляет их одним вызовом reduce
. Вот почему reduce
принимает Iterator
значений, а не только одно значение. Все, что вам нужно сделать, это выполнить итерацию по всему итератору, и когда hasNext()
возвращает false, именно тогда вы достигли конца ввода функции reduce
для этого конкретного ключа.
public static class Reduce extends MapReduceBase
implements Reducer<IntWritable, Text,
IntWritable, Text> {
public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<IntWritable, Text> output,
Reporter reporter) throws IOException {
int i=0
Text[] outputValues = new Text[7];
while (values.hasNext() && i < 7) {
outputValues[i++] = values.next();
}
// now output the contents of outputValues to the OutputCollector
}
}
Если вам нужны другие ключи для некоторых других вычислений, которые вы выполняете в редукторе, просто выведите их также из маппера и получите специальное значение часового (возможно, -1, в зависимости от того, что означают ваши ключи), которое получает вывод для каждого сопоставленного элемента данных, а затем просто запускайте эту специальную логику, только когда ключ равен значению часового.