Я предоставляю здесь решение, которое я в конечном итоге выяснил, используя mapreduce из GAE (без фазы сокращения).Если бы я начал с нуля, я бы, вероятно, использовал решение, предоставленное Drew Sears .
Работает в GAE python 1.5.0
В app.yaml Я добавил обработчик для mapreduce:
- url: /mapreduce(/.*)?
script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py
и обработчик длямой код для mapreduce (я использую url / mapred_update для сбора результатов, полученных mapreduce):
- url: /mapred_.*
script: mapred.py
Создано mapreduce.yaml для обработки объектов Car:
mapreduce:
- name: Color_Counter
params:
- name: done_callback
value: /mapred_update
mapper:
input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
handler: mapred.process
params:
- name: entity_kind
default: models.Car
Объяснение: done_callback - это URL, который вызывается после того, как mapreduce завершает свои операции. mapred.process - это функция, которая обрабатывает отдельную сущность и счетчики обновлений (это определено в файле mapred.py).Модель Car определена в models.py
mapred.py :
from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
def process(entity):
"""Process individual Car"""
color = entity.color
if color:
yield op.counters.Increment('car_color_%s' % color)
class UpdateCounters(webapp.RequestHandler):
"""Create stats models CarsByColor based on the data
gathered by mapreduce counters"""
def post(self):
"""Called after mapreduce operation are finished"""
# Finished mapreduce job id is passed in request headers
job_id = self.request.headers['Mapreduce-Id']
state = MapreduceState.get_by_job_id(job_id)
to_put = []
counters = state.counters_map.counters
# Remove counter not needed for stats
del counters['mapper_calls']
for counter in counters.keys():
stat = CarsByColor.get_by_key_name(counter)
if not stat:
stat = CarsByColor(key_name=counter,
name=counter)
stat.value = counters[counter]
to_put.append(stat)
db.put(to_put)
self.response.headers['Content-Type'] = 'text/plain'
self.response.out.write('Updated.')
application = webapp.WSGIApplication(
[('/mapred_update', UpdateCounters)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
Определение модели CarsByColor немного изменилось по сравнению с вопросом.
Вы можете запустить задание mapreduce вручную с URL: http://yourapp/mapreduce/ и, надеюсь, с cron (я еще не тестировал cron).