Простой пример с использованием mapreduce в Google App Engine - PullRequest
9 голосов
/ 19 мая 2011

Я несколько запутался с текущим состоянием поддержки mapreduce в GAE.Согласно документам http://code.google.com/p/appengine-mapreduce/ фаза сокращения пока не поддерживается, но в описании сеанса с I / O 2011 (http://www.youtube.com/watch?v=EIxelKcyCC0) написано: «Теперь можно запустить полную Map Reduce.вакансии на App Engine ".Интересно, могу ли я использовать mapreduce в этой задаче:

Что я хочу сделать:

У меня есть модель автомобиля с цветом поля:

class Car(db.Model):
    color = db.StringProperty()

Я хочу запустить процесс mapreduce (время от времени определяемый cron), который может вычислить, сколько машин в каждом цвете, и сохранить этот результат в хранилище данных.Похоже на работу, хорошо подходящую для mapreduce (но если я ошибаюсь, поправьте меня), фаза "map" выдаст пары (, 1) для каждой сущности Car, а фаза "Reduce" должна объединить эти данные по color_name, что даст мне ожидаемые результаты,Конечный результат, который я хочу получить, это объекты с вычисленными данными, хранящимися в хранилище данных, что-то вроде этого:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Проблема: Я не знаю, как реализовать это в appengine ..На видео показаны примеры с определенными функциями map и Reduce, но они представляются очень общими примерами, не связанными с хранилищем данных.Все остальные примеры, которые я обнаружил, используют одну функцию для обработки данных из DatastoreInputReader, но они кажутся только фазой «карты», нет примера того, как выполнить «уменьшение» (и как сохранить результаты сокращения вхранилищу).

Ответы [ 2 ]

9 голосов
/ 19 мая 2011

Вам не нужна фаза сокращения. Это можно сделать с помощью линейной цепочки задач, более или менее следующим образом:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

Это должно выполнить итерацию по всем вашим машинам, передать курсор запроса и текущий подсчет ряду специальных задач и сохранить итоги в конце.

Фаза сокращения может иметь смысл, если вам нужно объединить данные из нескольких хранилищ данных, нескольких моделей или нескольких индексов в одну модель. Я не думаю, что это что-то купит.

Другой вариант: использовать очередь задач, чтобы поддерживать текущие счетчики для каждого цвета. Когда вы создаете автомобиль, начинайте задание, чтобы увеличить общее значение для этого цвета. Когда вы обновите автомобиль, начните одну задачу, чтобы уменьшить старый цвет, а другую, чтобы увеличить новый. Обновляйте счетчики транзакциями, чтобы избежать условий гонки.

6 голосов
/ 22 мая 2011

Я предоставляю здесь решение, которое я в конечном итоге выяснил, используя mapreduce из GAE (без фазы сокращения).Если бы я начал с нуля, я бы, вероятно, использовал решение, предоставленное Drew Sears .

Работает в GAE python 1.5.0

В app.yaml Я добавил обработчик для mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

и обработчик длямой код для mapreduce (я использую url / mapred_update для сбора результатов, полученных mapreduce):

- url: /mapred_.*
  script: mapred.py

Создано mapreduce.yaml для обработки объектов Car:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

Объяснение: done_callback - это URL, который вызывается после того, как mapreduce завершает свои операции. mapred.process - это функция, которая обрабатывает отдельную сущность и счетчики обновлений (это определено в файле mapred.py).Модель Car определена в models.py

mapred.py :

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

Определение модели CarsByColor немного изменилось по сравнению с вопросом.

Вы можете запустить задание mapreduce вручную с URL: http://yourapp/mapreduce/ и, надеюсь, с cron (я еще не тестировал cron).

...