Новичок в Hadoop и dumbo, как правильно упорядочить эти операции? - PullRequest
2 голосов
/ 29 января 2010

Рассмотрим следующий формат файла журнала:

id        v1        v2        v3
1         15        30        25
2         10        10        20
3         50        30        30

Мы должны вычислить среднее значение частоты (AVF) для каждой строки данных в кластере Hadoop с использованием dumbo. AVF для точки данных с m атрибутами определяется как:

avf = (1/m)* sum (frequencies of attributes 1..m)

так для первого ряда, avf = (1/3) * (1 + 2 + 1) ~ = 1,33. Выброс идентифицируется низким AVF.

Задача программирования

У нас есть следующий псевдо / python-код:

H = {}  # stores attribute frequencies

map1(_, datapoint): # 
  for attr in datapoint.attrs:
    yield (attr, 1)

reduce1(attr, values):
  H[attr] = sum(values)

map2(_, datapoint):
  sum = 0
  m = len(datapoint.attrs)
  for attr in datapoint.attrs:
    sum += H[attr]        

  yield (1/m)*sum, datapoint

reduce2(avf, datapoints): # identity reducer, only sorts datapoints on avf
  yield avf, datapoints

Проблема в том, как подключить наш набор точек данных к map1 и map2, а также использовать промежуточный хеш H в map2. * Глобальное определение H, как указано выше, похоже на нарушение концепции MapReduce.

1 Ответ

0 голосов
/ 03 февраля 2010

Если я понимаю, первым шагом является вычисление гистограммы:

[attr, value] => frequency

где frequency - количество раз, которое value произошло в столбце attr.

Следующий шаг - взять таблицу гистограмм и исходные данные, для каждой строки рассчитать AVF и отсортировать их.

Я бы сделал это в два прохода: один проход с уменьшением карты для вычисления гистограммы, второй проход m-r для поиска AVF с использованием гистограммы. Я бы также использовал одну константу без хэша без вины, так как получение значений гистограммы и значений ячеек в одной и той же местности будет грязным зверем. (Например, map1 испускает [attr val id] с [attr val] в качестве ключа; а lower1 накапливает все записи для каждого ключа, подсчитывает их и выдает [id attr val count]. Второй проход использует id в качестве ключа для повторной сборки и затем усреднения каждый ряд).


Чтобы вычислить гистограмму, полезно думать о среднем шаге как о «группе», а не «сортировке». Вот как это делается: поскольку вход редуктора сортируется по ключу, он собирает все записи для данного ключа и, как только он видит другой ключ, генерирует счетчик. Wukong, рубиновый эквивалент dumbo, имеет Accumulator, и я предполагаю, что dumbo тоже. (См. Ниже рабочий код).

Это оставляет вас с

attr1    val1a      frequency
attr1    val1b      frequency
attr2    val2a      frequency
...
attrN    attrNz     frequency

Для следующего прохода я загружу эти данные в хеш-таблицу - простой Hash (dictionary), если он помещается в память, быстрое хранилище значений ключей, если нет - и вычислю каждую запись AVF так же, как у вас.


Здесь работает код ruby ​​для расчета avf; см http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb

Первый проход

module AverageValueFrequency
  # Names for each column's attribute, in order
  ATTR_NAMES = %w[length width height]

  class HistogramMapper < Wukong::Streamer::RecordStreamer
    def process id, *values
      ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
    end
  end

  #
  # For an accumulator, you define a key that is used to group records
  #
  # The Accumulator calls #start! on the first record for that group,
  # then calls #accumulate on all records (including the first).
  # Finally, it calls #finalize to emit a result for the group.
  #
  class HistogramReducer < Wukong::Streamer::AccumulatingReducer
    attr_accessor :count

    # use the attr and val as the key
    def get_key attr, val, *_
      [attr, val]
    end

    # start the sum with 0 for each key
    def start! *_
      self.count = 0
    end
    # ... and count the number of records for this key
    def accumulate *_
      self.count += 1
    end
    # emit [attr, val, count]
    def finalize
      yield [key, count].flatten
    end
  end
end

Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run

Второй проход

module AverageValueFrequency
  class AvfRecordMapper < Wukong::Streamer::RecordStreamer
    # average the frequency of each value
    def process id, *values
      sum = 0.0
      ATTR_NAMES.zip(values).each do |attr, val|
        sum += histogram[ [attr, val] ].to_i
      end
      avf = sum / ATTR_NAMES.length.to_f
      yield [id, avf, *values]
    end

    # Load the histogram from a tab-separated file with
    #   attr    val   freq
    def histogram
      return @histogram if @histogram
      @histogram = { }
      File.open(options[:histogram_file]).each do |line|
        attr, val, freq = line.chomp.split("\t")
        @histogram[ [attr, val] ] = freq
      end
      @histogram
    end
  end
end
...