Hadoop MapReduce с рекурсивной картой - PullRequest
4 голосов
/ 18 мая 2011

Мне нужно сделать приложение MapReduce на Java, которое должно быть авто-рекурсивным, то есть для каждой обработанной строки входного файла должно быть проверено все строки входных данных / записей Map на условие, проверенное функцией. Или, другими словами, Редуктор должен вызвать / прочитать всю карту для каждой полученной пары (ключ, значение).

Как лучше всего реализовать это на платформе Hadoop?

Я могу сделать это программно, прочитав ввод n раз или загрузив ввод в хэш-карту, но я думаю, что возможно сделать все это в парадигме MapReduce. Спасибо за любую помощь / совет! РЕДАКТИРОВАТЬ: Более подробно, у меня есть (в результате других заданий) список разделов пространства проблемы с (index, count) и я хочу сделать в качестве вывода (index, sumOfNearestNeighborsCounts), поэтому для каждый индекс, я хочу получить доступ к карте снова и для каждого индекса NearestNeighbor сумма подсчета событий. (См. также комментарий Costi Ciudatu)

Ответы [ 2 ]

2 голосов
/ 18 мая 2011

Для каждого индексного ключа вам нужно испускать ВСЕ возможные индексы соседей (которые вы должны иметь возможность производить математически).

Итак, давайте возьмем простой (линейный) пример. У вас есть одномерное пространство с {I1, I2, I3, I4}. Сосед будет просто означать «предыдущий или следующий элемент»: I1 соседствует с I2, но не с I3.

Для каждого индекса, поступающего в маппер, испускайте один ключ для каждого возможного соседа этого индекса (включая его самого! - мы определим, что каждый индекс является возможным соседом, но со специальным и абсурдным отрицательным значением для счетчика Объясню почему):

<I1, count(I1)> -> <I0, count(I1)>
                -> <I1, -1>
                -> <I2, count(I1)>

<I2, count(I2)> -> <I1, count(I2)>
                -> <I2, -1>
                -> <I3, count(I2)>

Теперь в редукторе вы получите следующие значения для каждого ключа:

I0: [ count(I1) ]
I1: [ count(I2), -1 ]
I2: [ count(I1), -1, count(I3) ]
...

В вашем редукторе итерируйте все значения соседей следующим образом:

boolean doesExist = false;
int sum = 0;
for (IntWritable value : values) {
    int count = value.get();
    if (count < 0) {
        doesExist = true;
    } else {
        sum += count;
    }
}
if (doesExist) {
    context.write(key, new IntWritable(sum));
}

Таким образом вы исключите (в приведенном выше примере) I0 и I4, которые не существуют и не будут иметь отрицательного значения в своих списках.


Теперь, чтобы приблизиться к вашему варианту использования, если вам нужны фактические значения индекса также во время итерации (а не только счетчики для всех соседей), вы можете сделать следующее:

Вместо того, чтобы выдавать простые числа из преобразователя, выведите несколько bean-компонентов-оболочек, содержащих как индекс, так и его счетчик. Таким образом, вы сможете исключить некоторых соседей на основе некоторых бизнес-ограничений или чего-то еще, но вы всегда будете работать только со списком (возможных) соседей для каждого данного индекса, а не со всем входным набором:

<I1, count(I1)> -> <I0, {I1, count(I1)}>
                -> <I1, {I1, count(I1)}>
                -> <I2, {I1, count(I1)}>
... and so on

Теперь в редукторе вы получите:

I0: [ {I1, count(I1)} ]
I1: [ {I1, count(I1)}, {I2, count(I2)} ]
I2: [ {I1, count(I1)}, {I2, count(I2)}, {I3, count(I3)} ]

Как вы можете заметить, искусственный подсчет -1 вам больше не нужен, так как для теста doesExist вы можете теперь проверить, имеет ли какой-либо bean-объект-оболочка в списке значений тот же индекс, что и индекс ключа.

Даже если число возможных соседей растет экспоненциально с количеством измерений (как вы уже упоминали), я бы сказал, что этот подход будет работать намного лучше, чем чтение всего ввода для каждой пары ключ / значение, и он намного лучше подходит на карте / уменьшить парадигму.

0 голосов
/ 18 мая 2011

В фазе карты выведите ключ для каждого соседа, а затем суммируйте при уменьшении.Псевдокод:

function map(index, count):
  for neighbor in neighbors(index):
     emit(neighbor, count)

function reduce(index, counts):
  total = sum(counts)
  emit(index, total)

Это не "рекурсивно", но оно должно решить вашу конкретную проблему, если я правильно понимаю.

...