Распараллеливание Ruby-редукторов в Hadoop? - PullRequest
4 голосов
/ 08 мая 2009

Простой редуктор количества слов в Ruby выглядит так:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

получает в STDIN все промежуточные значения преобразователей. Не с определенного ключа. Таким образом, на самом деле существует только ОДИН редуктор для всех (а не редуктор для каждого слова или набора слов).

Однако на примерах Java я видел этот интерфейс, который получает ключ и список значений как inout. Это означает, что промежуточные значения карты группируются по ключу перед уменьшением, а редукторы могут работать параллельно:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

Это только функция Java? Или я могу сделать это с помощью Hadoop Streaming с использованием Ruby?

Ответы [ 2 ]

5 голосов
/ 09 мая 2009

Редукторы всегда будут работать параллельно, независимо от того, используете ли вы потоковую передачу или нет (если вы этого не видите, убедитесь, что конфигурация задания настроена на несколько задач сокращения - см. Mapred.reduce.tasks в вашем кластере или настройка работы). Разница в том, что фреймворк упаковывает вещи немного приятнее для вас, когда вы используете Java, а не потоковую передачу.

Для Java задача сокращения получает итератор для всех значений для определенного ключа. Это позволяет легко проходить значения, если вы, скажем, суммируете выходные данные карты в своей задаче сокращения. При потоковой передаче вы буквально получаете поток пар ключ-значение. Вы гарантированно гарантируете, что значения будут упорядочены по ключу, и что для данного ключа не будет разбито между задачами сокращения, но любое отслеживание состояния вам нужно. Например, в Java ваша карта выводится на ваш редуктор символически в виде

key1, {val1, val2, val3} key2, {val7, val8}

При потоковой передаче ваш вывод выглядит как

key1, val1 key1, val2 key1, val3 key2, val7 key2, val8

Например, чтобы написать редуктор, который вычисляет сумму значений для каждого ключа, вам понадобится переменная для хранения последнего ключа, который вы видели, и переменная для хранения суммы. Каждый раз, когда вы читаете новую пару ключ-значение, вы делаете следующее:

  1. проверить, отличается ли ключ от последнего ключа.
  2. если это так, выведите ключ и текущую сумму и обнулите сумму до нуля.
  3. добавить текущее значение к вашей сумме и установить последний ключ для текущего ключа.

НТН.

1 голос
/ 08 мая 2009

Я сам не пробовал Hadoop Streaming, но, читая документы, я думаю, что вы можете добиться аналогичного параллельного поведения.

Вместо передачи ключа со связанными значениями в каждый редуктор потоковая передача сгруппирует выходные данные сопоставителя по ключам. Это также гарантирует, что значения с одинаковыми ключами не будут разделены на несколько редукторов. Это несколько отличается от обычной функциональности Hadoop, но даже в этом случае работа сокращения будет распределена по нескольким редукторам.

Попробуйте использовать опцию -verbose, чтобы получить больше информации о том, что на самом деле происходит. Вы также можете попробовать поэкспериментировать с опцией -D mapred.reduce.tasks=X, где X - желаемое количество редукторов.

...