Почему повторение запроса ndb к хранилищу данных занимает слишком много памяти? - PullRequest
0 голосов
/ 14 января 2020

У меня есть запрос вроде:

query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()

for ent in iter:
    # write log to file, nothing memory intensive

Я добавил журналы для for l oop, и чтение 10K строк увеличивает использование памяти на 200 МБ, затем чтение следующих 10K строк добавляет дополнительные 200 МБ и т. Д. , Чтение 100 КБ требует 2 ГБ, что превышает ограничение памяти верхнего уровня.

Я попытался очистить memcache в for для l oop, после чтения 10K строк, добавив:

                # clear ndb cache in order to reduce memory footprint
                context = ndb.get_context()
                context.clear_cache()

в для l oop на каждой 10K-й итерации, но это приводило к тайм-ауту запроса, возникала ошибка BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb.

Я изначально ожидал, что при использовании query.iter() вместо query.fetch() Я бы не столкнулся с проблемой памяти, и память была бы почти постоянной, но это не так. Есть ли способ читать данные с помощью итератора, не превышая ни времени, ни ограничений памяти? Очистив кэш контекста, я вижу, что потребление памяти почти постоянно, но я столкнулся с проблемами со временем, необходимым для извлечения всех строк.

Кстати, есть много строк, которые можно получить, вплоть до 150К. Можно ли сделать это с помощью некоторых простых настроек или мне нужно более сложное решение, например, которое будет использовать параллелизацию?

Ответы [ 2 ]

2 голосов
/ 15 января 2020

Вы запускаете это в shell-api-shell? В противном случае я мог бы предположить, что максимальный тайм-аут запроса движка приложения станет проблемой.

Вы должны определенно запустить это в потоке данных Google вместо этого. Он будет распараллеливать это для вас / работать быстрее.

https://beam.apache.org/documentation/programming-guide/ https://beam.apache.org/releases/pydoc/2.17.0/index.html https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

I Представьте, что ваш код линии будет выглядеть примерно так:

def run(project, gcs_output_prefix, exec_id):

    def format_result(element):
        csv_cells = [
            datastore_helper.get_value(element.properties['exec_id']),
            # extract other properties here!!!!!
        ]
        return ','.join([str(cell) for cell in csv_cells])

    pipeline_options = PipelineOptions([])
    pipeline_options.view_as(SetupOptions).save_main_session = True

    p = beam.Pipeline(options=pipeline_options)

    query = query_pb2.Query()
    query.kind.add().name = 'HistoryLogs'

    datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)

    _ = (p 
         | 'read' >> ReadFromDatastore(project, query, None)
         | 'format' >> beam.Map(format_result)
         | 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
                                          file_name_suffix='.csv',
                                          num_shards=1) # limits output to a single file
    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(project='YOUR-PROJECT-ID', 
        gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
        exec_id='1234567890')

Этот код считывается из хранилища данных Google и экспортируется в облачное хранилище Google как CSV.

0 голосов
/ 18 января 2020

Типичное решение для предотвращения достижения ограничений по времени памяти и обработки запросов при работе с большими количествами объектов, полученных из запросов к хранилищам данных, состоит в том, чтобы разделить рабочую нагрузку на несколько управляемых кусков с помощью курсоров и распределить их по нескольким запросам. (например, с использованием задач очередей pu sh), в конечном итоге смещенных во времени, чтобы также предотвратить взрыв экземпляра и конфликты при доступе к выходному носителю (если есть).

Таким образом, вы можете обрабатывать практически неограниченные рабочие нагрузки, даже если по какой-либо причине вы не можете / не будете использовать хорошее решение для потока данных, предложенное Алексом.

Пример техники можно найти в Как удалить все записи из хранилища данных Google?

Но помните об ограничениях курсоров, см. Курсоры GAE Datastore постоянные и долговечные?

...