/ 16 марта 2020
  1. Если я запустите этот код, он будет зависать без выброса ZeroDivisionError.
  2. Если я перейду await asyncio.gather(*tasks, return_exceptions=True) выше await queue.join(), он, наконец, выкинет ZeroDivisionError и остановится.
  3. Если я затем закомментирую 1 / 0 и выполню, он выполнит все, но будет висеть в конце.

Теперь вопрос в том, как я могу достичь и:

  1. Возможность видеть неожиданные исключения, как в случае 2 выше, и. ..
  2. На самом деле остановить, когда все задачи выполнены в очереди


import asyncio
import random
import time

async def worker(name, queue):
    while True:
        print('Get a "work item" out of the queue.')
        sleep_for = await queue.get()

        print('Sleep for the "sleep_for" seconds.')
        await asyncio.sleep(sleep_for)

        # Error on purpose
        1 / 0

        print('Notify the queue that the "work item" has been processed.')

        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    print('Create a queue that we will use to store our "workload".')
    queue = asyncio.Queue()

    print('Generate random timings and put them into the queue.')
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for

    print('Create three worker tasks to process the queue concurrently.')
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))

    print('Wait until the queue is fully processed.')
    started_at = time.monotonic()

    print('Joining queue')
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print('Cancel our worker tasks.')
    for task in tasks:

    print('Wait until all worker tasks are cancelled.')
    await asyncio.gather(*tasks, return_exceptions=True)

    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

/ 16 марта 2020

Есть несколько способов подойти к этому, но центральная идея состоит в том, что в asyncio, в отличие от многопоточной обработки c, легко ожидать нескольких вещей одновременно.

Например, вы можете ждать queue.join() и рабочие задачи, в зависимости от того, что выполнено первым. Так как работники не заканчивают нормально (вы отменяете их позже), работник, выполняющий завершение, означает, что он поднял.

# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())

# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)

if not queue_complete.done():
    # If the queue hasn't completed, it means one of the workers has
    # raised - find it and propagate the exception.  You can also
    # use t.exception() to get the exception object. Canceling other
    # tasks is another possibility.
    for t in tasks:
        if t.done():
            t.result()  # this will raise