Использование asyncio.Queue для потока производитель-потребитель - PullRequest
0 голосов
/ 01 октября 2018

Я не совсем понимаю, как использовать asyncio.Queue для конкретного шаблона «производитель-потребитель», в котором как производитель, так и потребитель работают одновременно и независимо.

Сначала рассмотрим этот пример, который очень близко следует из документы для asyncio.Queue:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')

async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

if __name__ == '__main__':
    import sys
    n = 3 if len(sys.argv) == 1 else sys.argv[1]
    asyncio.run(main())

В этом скрипте есть одна более тонкая деталь: элементы помещаются в очередь синхронно, с queue.put_nowait(sleep_for) над обычным циклом for.

Моя цель - создать скрипт, который использует async def worker() (или consumer()) и async def producer().Оба должны быть запланированы для одновременной работы.Ни одна потребительская сопрограмма явно не связана или не связана с производителем.

Как я могу изменить вышеуказанную программу так, чтобы у производителя (ей) была собственная сопрограмма, которую можно запланировать одновременно с потребителями / работниками?


Второй пример из PYMOTW .Он требует, чтобы производитель заранее знал количество потребителей, и использует None в качестве сигнала потребителю о том, что производство завершено.

1 Ответ

0 голосов
/ 02 октября 2018

Как изменить указанную выше программу таким образом, чтобы производитель (и) представлял собой собственную сопрограмму, которую можно планировать одновременно с потребителями / работниками?

Пример можно обобщить безизменив его основную логику:

  • Переместите цикл вставки в отдельную сопрограмму производителя.
  • Запустите потребителей в фоновом режиме, позволяя им обрабатывать произведенные элементы.
  • Дождитесь, пока производитель (и) завершат, await, используя их, как с await producer() или await gather(*producers) и т. Д.
  • Как только все производители закончили, дождитесь обработки оставшихся произведенных изделий с помощьюawait queue.join()
  • Отмените потребителей, все из которых теперь бездействуют, ожидая следующего элемента в очереди, который никогда не прибудет.

Вот пример, реализующий вышеупомянутое:

import asyncio, random, time

async def rnd_sleep(t):
    # sleep for T seconds on average
    await asyncio.sleep(t * random.random() * 2)

async def producer(queue):
    while True:
        token = random.random()
        print(f'produced {token}')
        if token < .05:
            break
        await queue.put(token)
        await rnd_sleep(.1)

async def consumer(queue):
    while True:
        token = await queue.get()
        await rnd_sleep(.3)
        queue.task_done()
        print(f'consumed {token}')

async def main():
    queue = asyncio.Queue()

    # fire up the both producers and consumers
    producers = [asyncio.create_task(producer(queue))
                 for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue))
                 for _ in range(10)]

    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print('---- done producing')

    # wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()

asyncio.run(main())
...