asyncio.Queue поток производитель-потребитель не может обрабатывать исключение, когда потребители содержатся в именованном списке - PullRequest
0 голосов
/ 07 августа 2020

Работа над потоком производитель-потребитель на основе asyncio.Queue. Коды ниже берут ссылку из этого ответа и этого блога .

import asyncio

async def produce(q: asyncio.Queue, t):
    asyncio.create_task(q.put(t))
    print(f'Produced {t}')

async def consume(q: asyncio.Queue):
    while True:
        res = await q.get()
        if res > 2:
            print(f'Cannot consume {res}')
            raise ValueError(f'{res} too big')
        print(f'Consumed {res}')
        q.task_done()

async def shutdown(loop, signal=None):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    print(f"Cancelling {len(tasks)} outstanding tasks")
    [task.cancel() for task in tasks]

def handle_exception(loop, context):
    msg = context.get("exception", context["message"])
    print(f"Caught exception: {msg}")
    asyncio.create_task(shutdown(loop))

async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

    [asyncio.create_task(consume(queue)) for _ in range(1)]
    # consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    try:
        for i in range(6):
            await asyncio.create_task(produce(queue, i))
        await queue.join()
    except asyncio.exceptions.CancelledError:
        print('Cancelled')


asyncio.run(main())

При упаковке потребителей, как указано выше (без списка имен), результат будет таким, как ожидалось :

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Caught exception: 3 too big
Produced 4
Cancelling 2 outstanding tasks
Cancelled

Но при присвоении списку потребителей имени, что означает изменение кода внутри main(), например:

async def main():
    # <-- snip -->

    # [asyncio.create_task(consume(queue)) for _ in range(1)]
    consumers = [asyncio.create_task(consume(queue)) for _ in range(1)]

    # <-- snip -->

Программа застревает вот так:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Produced 5  # <- stuck here, have to manually stop by ^C

Похоже, что producer все еще продолжает производить, так что элементы в queue продолжают расти после поднятия ValueError. На handle_exception никогда не звонят. И программа застревает на await queue.join().

Но почему присвоение имени списку потребителей может изменить поведение кода? Почему handle_exception никогда не вызывается после того, как был назван список потребителей?

Ответы [ 2 ]

1 голос
/ 07 августа 2020

TL; DR Не используйте set_exception_handler для обработки исключений в задачах. Вместо этого добавьте реквизит try: ... except: ... в саму сопрограмму.

Проблема заключается в попытке использовать set_exception_handler для обработки исключений. Эта функция является последней попыткой обнаружить исключение, которое прошло до события l oop, скорее всего, в результате ошибки в программе. Если обратный вызов добавлен с помощью loop.call_soon или loop.call_at et c. вызывает исключение (и не улавливает его), обработчик, установленный set_exception_handler, будет последовательно вызываться.

С задачей все более тонко: задача доводит сопрограмму до завершения и, когда она выполняется, сохраняет свой результат , делая его доступным для всех, кто ожидает выполнения задачи, для обратных вызовов, установленных add_done_callback, а также для любого вызова, вызывающего result() в задаче. (Все это предусмотрено контрактом Future, который Task является подклассом.) Когда сопрограмма вызывает необработанное исключение, это исключение является просто еще одним результатом: когда кто-то ожидает задачу или вызывает result(), исключение будет (повторно) вызвано тут же.

Это приводит к разнице между именованием и отсутствием именования объектов задачи. Если вы не укажете их имена, они будут уничтожены, как только событие l oop завершит их выполнение. В момент их уничтожения Python заметит, что никто никогда не обращался к их результату, и передаст его обработчику исключения. С другой стороны, если вы сохраните их в переменной, они не будут уничтожены, пока на них ссылается переменная, и не будет причин для вызова обработчика события l oop: насколько Python обеспокоен, вы можете решить вызвать .result() для объектов в любой момент, получить доступ к исключению и обработать его в соответствии с вашей программой.

Чтобы устранить проблему, просто обработайте исключение самостоятельно, добавив try: ... except: ... блок вокруг тела сопрограммы. Если вы не контролируете сопрограмму, вы можете вместо этого использовать add_done_callback() для обнаружения исключения.

1 голос
/ 07 августа 2020

Дело не в названном списке. Ваш пример можно упростить до:

asyncio.create_task(consume(queue))
# consumer = asyncio.create_task(consume(queue))

Дело здесь в объекте Task, который возвращает функция create_task. В одном случае он разрушен, а в другом - нет. Хорошие ответы даны здесь и здесь

...