Скрученный Python: итераторы и выходы / inlineCallbacks - PullRequest
18 голосов
/ 12 мая 2011

Folks, Я полностью сбит с толку, так что, возможно, я даже не спрашиваю правильно, но здесь идет речь:

У меня есть скрученное приложение, использующее inlineCallbacks. Теперь мне нужно определить итератор, который будет означать, что генератор возвращается вызывающей стороне. Тем не менее, итератор не может быть украшен inlineCallbacks, не так ли? Если нет, то как я могу кодировать что-то вроде этого.

Просто чтобы уточнить: цель состоит в том, что process_loop нужно вызывать каждые, скажем, 5 секунд, он может обработать только ОДИН кусок, скажем, 10, а затем он должен отпустить. Однако, чтобы знать, что блок 10 (хранится в кэшированном, что является диктатом), он должен вызвать функцию, которая возвращает отложенное.

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'

Ответы [ 3 ]

12 голосов
/ 12 мая 2011

Вы правы, что не можете выразить то, что хотите выразить в cacheiter.Декоратор inlineCallbacks не позволит вам иметь функцию, которая возвращает итератор.Если вы украсите функцию с ее помощью, то результатом будет функция, которая всегда возвращает Deferred.Вот для чего это.

Отчасти это затрудняет то, что итераторы плохо работают с асинхронным кодом.Если в создании элементов вашего итератора участвует Deferred, то элементы, которые выходят из вашего итератора, сначала будут Deferreds.

Вы можете сделать что-то подобное, чтобы учесть это:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

Это может сработать, но выглядит особенно странно.Поскольку генераторы могут уступать только своему вызывающему (а не, например, вызывающему), итератор some_jobs ничего не может с этим поделать;только лексический код внутри process_work может привести к ожиданию батута, обеспеченного inlineCallbacks.

Если вы не возражаете против этого паттерна, то мы можем представить, что ваш код написан примерно так:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

Еще один подход, который вы можете использовать, это использование twisted.internet.task.cooperate.cooperate берет итератор и потребляет его, предполагая, что его потребление потенциально дорого, и распределяя работу по нескольким итерациям реактора.Принимая определение cacheiter сверху:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()
1 голос
/ 12 мая 2011

Я думаю, вы пытаетесь это сделать:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

Когда genexp не может выразить то, что вы пытаетесь вернуть, вы можете написать закрытие:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it
0 голосов
/ 12 мая 2011

Попробуйте написать свой итератор как DeferredGenerator.

...