Я пробую каркас mapreduce из (http://code.google.com/p/appengine-mapreduce/) и немного изменил демонстрационное приложение (используйте mapreduce.input_readers.DatastoreInputReader вместо mapreduce.input_readers.BlobstoreZipInputReader).
Я установил 2 класса конвейера:
class IndexPipeline(base_handler.PipelineBase):
def run(self):
output = yield mapreduce_pipeline.MapreducePipeline(
"index",
"main.index_map", #added higher up in code
"main.index_reduce", #added higher up in code
"mapreduce.input_readers.DatastoreInputReader",
mapper_params={
"entity_kind": "model.SearchRecords",
},
shards=16)
yield StoreOutput("Index", output)
class StoreOutput(base_handler.PipelineBase):
def run(self, mr_type, encoded_key):
logging.info("output is %s %s" % (mr_type, str(encoded_key)))
if encoded_key:
key = db.Key(encoded=encoded_key)
m = db.get(key)
yield op.db.Put(m)
И запустите его с:
pipeline = IndexPipeline()
pipeline.start()
Но я продолжаю получать эту ошибку:
Handler yielded two: ['a'] , but no output writer is set.
Я пытался найти где-нибудь в источнике , где установить выходной модуль записи, но безуспешно. Единственное, что я обнаружил, это то, что нужно установить где-нибудь output_writer_class
.
Кто-нибудь знает, как это установить?
Если заметить, аргумент encoded_key
в StoreOutput
всегда выглядит как None.