Python асинхронный генератор не асинхронный - PullRequest
4 голосов
/ 05 июля 2019

Мой код выглядит следующим образом.Я хочу, чтобы два режима сна могли использовать один и тот же период времени и занять 1 + 2 * 3 = 7 секунд для запуска сценария.Но, похоже, что-то не так произошло, так что это все равно занимает 3 * (1 + 2) секунды.

Есть ли идеи, как изменить код?

import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()

Ответы [ 2 ]

1 голос
/ 05 июля 2019

В качестве альтернативы выполнению этого с очередью, вместо этого решение объединяет цепочку Futures вместе, так что результатом Future является текущий элемент, а другим Future - для получения следующего элемента (вроде как связанный список, так сказать):

from asyncio import sleep, get_event_loop, run, create_task

async def aiter(fut, async_generator):
    try:
        async for item in async_generator:
            fut, prev_fut = get_event_loop().create_future(), fut
            prev_fut.set_result((item, fut))
        else:
            fut.set_exception(StopAsyncIteration())
    except Exception as e:
        fut.set_exception(e)


async def concurrent(async_generator):
    fut = get_event_loop().create_future()
    create_task(aiter(fut, async_generator))

    try:
        while True:
            item, fut = await fut
            yield item
    except StopAsyncIteration as e:
        return

В качестве дополнительного бонуса это решение будет правильно обрабатывать исключение, возникающее в g (), путем повторного вызова исключения в методе main () с трассировкой, которая будет полезна для отладки.

1 голос
/ 05 июля 2019

Смысл async / await заключается в чередовании задач , а не функций / генераторов.Например, когда вы await asyncio.sleep(1), ваша текущая сопрограмма задерживается вместе со сном.Точно так же async for задерживает свою сопрограмму до тех пор, пока не будет готов следующий элемент.

Чтобы запустить отдельные функции, вы должны создать каждую часть как отдельную задачу.Используйте Queue для обмена предметами между ними - задания будут отложены только до тех пор, пока они не обменяются предметами.

from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())

Если вам регулярно нужна эта функциональность, вы также можете поместить ее в помощник.объект, который оборачивает асинхронную итерацию.Помощник инкапсулирует очередь и отдельную задачу.Вы можете применить помощник непосредственно к асинхронной итерации в операторе async for.

from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...