Предел памяти, достигнутый с appengine-mapreduce - PullRequest
8 голосов
/ 12 февраля 2012

Я работаю над функцией appengine-mapreduce и изменил демо, чтобы соответствовать моей цели. В основном у меня есть миллион над строками в следующем формате: userid, time1, time2. Моя цель - найти разницу между временем1 и временем2 для каждого идентификатора пользователя.

Однако, когда я запустил это в Google App Engine, я обнаружил это сообщение об ошибке в разделе журналов:

Превышено мягкое ограничение частной памяти: 180,56 МБ после обслуживания. Всего 130 запросов. При обработке этого запроса было обнаружено, что процесс, который обработал этот запрос, использует слишком много памяти и был остановлен. Это может привести к использованию нового процесса для следующего запроса к вашему приложению. Если вы часто видите это сообщение, в вашем приложении может быть утечка памяти.

def time_count_map(data):
  """Time count map function."""
  (entry, text_fn) = data
  text = text_fn()

  try:
    q = text.split('\n')
    for m in q:
        reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
  except IndexError, e:
    logging.debug(e)


def time_count_reduce(key, values):
  """Time count reduce function."""
  time = 0.0
  for subtime in values:
    time += float(subtime)
    realtime = int(time)
  yield "%s: %d\n" % (key, realtime)

Кто-нибудь может подсказать, как еще я могу оптимизировать свой код лучше? Спасибо !!

Отредактировано:

Вот обработчик конвейера:

class TimeCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Time count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "time_count",
        "main.time_count_map",
        "main.time_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=32)
    yield StoreOutput("TimeCount", filekey, output)

Mapreduce.yaml:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4
- name: Make messages upper case
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.upper_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4

Остальные файлы точно такие же, как и у демо.

Я загрузил копию своих кодов в Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

Ответы [ 2 ]

6 голосов
/ 15 февраля 2012

Также рассмотрите возможность вызова gc.collect () в регулярных точках во время вашего кода.Я видел несколько SO вопросов о превышении мягких пределов памяти, которые были облегчены вызовом gc.collect (), большинство из которых связано с blobstore.

2 голосов
/ 13 февраля 2012

Вероятно, размер вашего входного файла превышает максимально допустимый объем памяти. Для больших файлов используйте BlobstoreLineInputReader или BlobstoreZipLineInputReader.

Эти читатели ввода передают что-то отличное от функции map, они пропускают start_position в файле и строку текста.

Ваша map функция может выглядеть примерно так:

def time_count_map(data):
    """Time count map function."""
    text = data[1]

    try:
        reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
    except IndexError, e:
        logging.debug(e)

Использование BlobstoreLineInputReader позволит заданию работать намного быстрее, так как в нем может использоваться более одного шарда, вплоть до 256, но это означает, что вам нужно загружать файлы без сжатия, что может быть проблемой. Я справляюсь с этим, загружая сжатые файлы на сервер Windows EC2, затем распаковываю и загружаю оттуда, так как пропускная способность восходящего потока очень велика.

...