Как реализовать функцию итератора AsyncIO "new style" Python из класса "old style"? - PullRequest
1 голос
/ 07 марта 2020

Я пытаюсь выучить Python AsyncIO, но у меня много проблем с поиском хороших учебных пособий, актуальных и т. Д. c.

Допустим, у нас есть этот "старый стиль" asyn c класс итератора:

def chain(sink, *coro_pipeline):
    f = sink
    for coro_func, coro_args, coro_kwargs in coro_pipeline:
        f = coro_func(f, *coro_args, **coro_kwargs)
    return f

class sendable_deque(collections.deque):
    send = collections.deque.append


class AsyncIterator(object):

    def __init__(self, f, buf_size, *coro_pipeline):
        self.events = sendable_deque()
        self.coro = chain(self.events, *coro_pipeline)
        self.coro_finished = False
        self.f = f
        self.buf_size = buf_size

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.events:
            return self.events.popleft()
        if self.coro_finished:
            raise StopAsyncIteration
        while True:
            data = await self.f.read(self.buf_size)
            try:
                self.coro.send(data)
                if self.events:
                    return self.events.popleft()
            except StopIteration:
                self.coro_finished = True
                if self.events:
                    return self.events.popleft()
                raise StopAsyncIteration

# An example of a pipeline
def _parse_pipeline(parser, config):
    return (
        (parser['parse_basecoro'], [], {}),
        (parser['basic_parse_basecoro'], [], config)
    )

Как это теперь реализовано? Поскольку я склонен полагать, что мы используем что-то более похожее на:

async def async_parse(f, buf_size):
    while True:
         yield f.read(buf_size)

async def parse(f, buf_size):
    async with async_parse(f, buf_size) as pf:
        print(pf)

Но код в классе использует метод цепочки, который, я считаю, делается с asyncio.gather()?

Любой хороший Современные руководства и учебники также могут помочь.

1 Ответ

1 голос
/ 08 марта 2020

Используя асинхронный генератор c, вышеприведенный свернутый вручную асинхронный c итератор может быть выражен следующим образом (не проверено):

async def async_iterable(f, buf_size, coro_pipeline):
    events = sendable_deque()
    coro = utils.chain(events, *coro_pipeline)
    done = False
    while True:
        while events:
            yield events.popleft()
        if done:
            break
        data = await f.read(buf_size)
        try:
            coro.send(data)
        except StopIteration:
            done = True

В долгосрочной перспективе вам может потребоваться пересмотреть использование двунаправленные генераторы и замените их собственными асинхронными конструкциями, такими как queues .

...