параллельное выполнение кода python2.7 ndb - PullRequest
0 голосов
/ 29 марта 2012

в моем приложении i для одного из обработчиков, мне нужно получить группу сущностей и выполнить функцию для каждой из них.

У меня есть ключи всех нужных мне сущностей.после их извлечения мне нужно выполнить 1 или 2 метода экземпляра для каждого из них, и это немного замедляет мое приложение.Выполнение этого для 100 сущностей занимает около 10 секунд, что очень медленно.

Я пытаюсь найти способ получить сущности и выполнять эти функции параллельно, чтобы сэкономить время, но я не совсем уверен, какой путь лучше.

я пробовал _post_get_hook, но у меня есть объект будущего, и мне нужно вызвать get_result () и выполнить функцию в хуке, которая работает нормально в sdk, но получает большую 'максимальную глубину рекурсии, превышеннуюво время вызова Python objec ', но я не могу понять, почему, и сообщение об ошибке не совсем продумано.

- это Pipeline api или ndb.Tasklets, что я ищу?

atm imидти методом проб и ошибок, но я был бы рад, если бы кто-то мог привести меня в правильном направлении.

РЕДАКТИРОВАТЬ

мой код похож на файловую систему, каждая папкасодержит другие папки и файлы.Путь коллекций установлен на другом объекте, поэтому для сериализации объекта коллекции мне нужно получить ссылочный объект и получить путь.В коллекции функция serialized_assets () работает медленнее, чем больше объектов в ней содержится.Если бы я мог выполнить функцию сериализации для каждого содержащегося актива рядом, это бы немного ускорило процесс.

class Index(ndb.Model):
    path = ndb.StringProperty()


class Folder(ndb.Model):
    label = ndb.StringProperty()
    index = ndb.KeyProperty()

    # contents is a list of keys of contaied Folders and Files
    contents = ndb.StringProperty(repeated=True)    

    def serialized_assets(self):
        assets = ndb.get_multi(self.contents)

        serialized_assets = []
        for a in assets:
            kind = a._get_kind()
            assetdict = a.to_dict()
            if kind == 'Collection':
                assetdict['path'] = asset.path
                # other operations ...
            elif kind == 'File':
                assetdict['another_prop'] = asset.another_property
                # ...
            serialized_assets.append(assetdict)

        return serialized_assets

    @property
    def path(self):
        return self.index.get().path


class File(ndb.Model):
    filename = ndb.StringProperty()
    # other properties....

    @property
    def another_property(self):
        # compute something here
        return computed_property

EDIT2:

    @ndb.tasklet
    def serialized_assets(self, keys=None):
        assets = yield ndb.get_multi_async(keys)
        raise ndb.Return([asset.serialized for asset in assets])

- этоэтот код тасклета в порядке?

Ответы [ 2 ]

2 голосов
/ 29 марта 2012

Поскольку большая часть времени выполнения ваших функций тратится на ожидание RPC, поддержка асинхронных и тасклетов в NDB - ваш лучший выбор.Это описано более подробно здесь .Самое простое использование для ваших требований - это, вероятно, использовать функцию ndb.map, например так (из документов):

@ndb.tasklet
def callback(msg):
  acct = yield ndb.get_async(msg.author)
  raise tasklet.Return('On %s, %s wrote:\n%s' % (msg.when, acct.nick(), msg.body))

qry = Messages.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
  print output

Функция обратного вызова вызывается для каждой сущности, возвращаемой запросом, и можетлюбые необходимые операции (с использованием _async методов и yield для их асинхронного выполнения), возвращая результат по завершении.Поскольку обратный вызов является тасклетом и использует yield для выполнения асинхронных вызовов, NDB может параллельно запускать несколько его экземпляров и даже группировать некоторые операции.

0 голосов
/ 29 марта 2012

API-интерфейс конвейера - это то, что вы хотите сделать.Есть ли причина, по которой вы не можете просто использовать очередь задач?

Используйте начальный запрос для получения всех ключей сущностей, а затем поставьте задачу в очередь для каждого ключа, в котором задача выполняет 2 функции для каждой сущности.Тогда параллелизм будет основан на количестве одновременных запросов, настроенных для этой очереди задач.

...