Почему очередь asyncio ожидает блокировки get ()? - PullRequest
2 голосов
/ 30 мая 2019

Почему ждут queue.get () блокировка?

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

async def main():
    queue = asyncio.Queue()
    await consumer(queue)
    await producer(queue, 1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Если я позвоню продюсеру () до потребителя () , он будет работать нормально То есть, следующее работает отлично.

async def main():
    queue = asyncio.Queue()
    await producer(queue, 1)
    await consumer(queue)

Почему бы не await queue.get () возвращая управление обратно в цикл событий, чтобы можно было запустить сопрограмму производителя, которая заполнит очередь, чтобы queue.get () могла вернуться.

Ответы [ 3 ]

1 голос
/ 30 мая 2019

Вам нужно запустить потребителя и производителя параллельно, например, определив main следующим образом:

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(consumer(queue), producer(queue, 1))

Если по какой-то причине вы не можете использовать gather, тогда вы можете сделать (эквивалент) this:

async def main():
    queue = asyncio.Queue()
    asyncio.create_task(consumer(queue))
    asyncio.create_task(producer(queue, 1))
    await asyncio.sleep(100)  # what your program actually does

Почему await queue.get() не возвращает управление в цикл событий, чтобы можно было запустить сопрограмму производителя, которая заполнит очередь, так что queue.get() можетreturn.

await queue.get() означает , возвращая управление обратно в цикл обработки событий.Но await означает wait , поэтому, когда ваша main сопрограмма говорит await consumer(queue), это означает «возобновите меня, как только consumer(queue) завершится».Поскольку consumer(queue) сам ожидает, что кто-то что-то произведет, у вас есть классический случай тупика.

Изменение порядка срабатывания работает только потому, что ваш продюсер однократный, поэтому он немедленно возвращается к вызывающей стороне.Если бы ваш продюсер ожидал внешнего источника (например, сокета), у вас тоже была бы тупиковая ситуация.Параллельный запуск их позволяет избежать тупика независимо от того, как написаны producer и consumer.

0 голосов
/ 30 мая 2019

Вы должны использовать .run_until_complete() с .gather()

Вот ваш обновленный код:

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(
    asyncio.gather(consumer(queue), producer(queue, 1))
)
loop.close()

Из:

val = 1

Также вы можете использовать .run_forever() с .create_task()

Итак, ваш фрагмент кода будет:

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(consumer(queue))
loop.create_task(producer(queue, 1))
try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.close()

Из:

val = 1
0 голосов
/ 30 мая 2019

Это потому, что вы вызываете await consumer(queue), что означает, что следующая строка (procuder) не будет вызываться до тех пор, пока не вернется consumer, что, конечно, никогда не произойдет, потому что никто еще не произвел

посмотрите пример в документации и посмотрите, как он там используется: https://docs.python.org/3/library/asyncio-queue.html#examples

другой простой пример:

import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...