await queue.put (item) в asyncio.Queue, похоже, не освобождает управление циклом событий - PullRequest
0 голосов
/ 09 февраля 2019

В этом простом примере производителя / потребителя это выглядит так, как будто await queue.put(item) не освобождает цикл событий, чтобы позволить потребителю работать до его завершения.Это приводит к тому, что производитель помещает все свои товары в очередь, и только тогда потребитель получает их, чтобы снять их.

Ожидается ли это?

Я получаю результат, который ищу, еслиследуйте await queue.put(item) с await asyncio.sleep(0).

Затем производитель помещает 1 элемент в очередь, а потребители затем убирают 1 элемент из очереди.

Я получаю тот же результат в Python 3.6.8 и 3.7.2.

import asyncio

async def produce(queue, n):
    for x in range(1, n + 1):
        print('producing {}/{}'.format(x, n))
        item = str(x)
        await queue.put(item)
        # await asyncio.sleep(0)
    await queue.put(None)

async def consume(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print('consuming item {}...'.format(item))

loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()

1 Ответ

0 голосов
/ 09 февраля 2019

Это приводит к тому, что производитель помещает все свои предметы в очередь, и только тогда потребитель получает возможность их снять.Это ожидается?

Да.Проблема в том, что ваша очередь не ограничена, поэтому помещение чего-либо в нее никогда не приостанавливает продюсера и, следовательно, никогда не уступает другим сопрограммам.То же самое относится ко всем ожиданиям, которые немедленно предоставляют данные, например, , считанное в EOF .

Если цикл производителя содержал другой источник приостановки, такой как ожидание фактического ввода (он должен получитьв конце концов, из-за чего-то, что это приостановит, и проблема не будет сразу заметна.Принудительная приостановка, использующая asyncio.sleep(0), также работает, но она хрупкая, потому что для запуска потребителя используется одна подвеска.Это может не всегда иметь место, поскольку потребитель сам может ожидать некоторые события, отличные от очереди.

Неограниченная очередь имеет смысл в некоторых ситуациях, например, когда очередь предварительно заполнена задачами, илиархитектура производителя ограничивает количество предметов разумным количеством.Но если элементы очереди генерируются динамически, лучше добавить границу.Ограничение гарантирует обратное давление на производителя и гарантирует, что оно не монополизирует цикл обработки событий.

...