Я работаю над функцией appengine-mapreduce и изменил демо, чтобы соответствовать моей цели.
В основном у меня есть миллион над строками в следующем формате: userid, time1, time2. Моя цель - найти разницу между временем1 и временем2 для каждого идентификатора пользователя.
Однако, когда я запустил это в Google App Engine, я обнаружил это сообщение об ошибке в разделе журналов:
Превышено мягкое ограничение частной памяти: 180,56 МБ после обслуживания. Всего 130 запросов.
При обработке этого запроса было обнаружено, что процесс, который обработал этот запрос, использует слишком много памяти и был остановлен. Это может привести к использованию нового процесса для следующего запроса к вашему приложению. Если вы часто видите это сообщение, в вашем приложении может быть утечка памяти.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
Кто-нибудь может подсказать, как еще я могу оптимизировать свой код лучше? Спасибо !!
Отредактировано:
Вот обработчик конвейера:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
Остальные файлы точно такие же, как и у демо.
Я загрузил копию своих кодов в Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip