Так что, если я действительно вас правильно понял, я бы разработал это следующим образом:
Вы бы использовали IdentityMapper, здесь никакой специальной логики не требуется.
Я бы собрал ключ и метку времени вместе в качестве ключа для пары. Оставляя ваше значение как значение пары:
- HadoopKey = {ключ | отметка времени}
- HadoopValue = {значение)
Теперь вам нужно реализовать свой собственный компаратор, чтобы пары с одним и тем же исходным ключом, но с другой временной меткой, распознавались как имеющие один и тот же ключ и, следовательно, шли вместе. (GroupingComparator)
И важно, что пары для редуктора упорядочены по метке времени по убыванию. (KeyComparator)
Посмотрите на
- Класс RawComparator,
- Jobconf's setOutputValueGroupingComparator () &
- метод setOutputKeyComparatorClass ()
- и «Hadoop - полное руководство», глава 4, стр. 100
- или просто спросите, нужна ли вам помощь; -)
Редуктор получит все пары с одним и тем же ключом - упс, спойлер ... , теперь они должны были быть отсортированы по отметке времени. Если первая и самая младшая временная метка пригодна для этой итерации, то все пары ключ-значение для этого вызова редуктора испускаются.
Если временная метка дисквалифицируется, пары с этим ключом не выдаются.
Я думаю, что это должно сделать.