Из комментария:
Я стараюсь избегать поведения, когда потребитель не замечает смерти производителя и ждет в очереди бесконечно.Я хочу, чтобы потребитель был уведомлен о смерти производителя и имел возможность отреагировать.или просто потерпеть неудачу, и даже если он также ожидает в очереди.
Кроме ответа, представленного Игалом, другой способ - создать третью задачу, которая контролирует два, и отменяет один, когда другой заканчивает.Это можно обобщить для любых двух задач:
async def cancel_when_done(source, target):
assert isinstance(source, asyncio.Task)
assert isinstance(target, asyncio.Task)
try:
await source
except:
# SOURCE is a task which we expect to be awaited by someone else
pass
target.cancel()
Теперь при настройке производителя и потребителя вы можете связать их с помощью вышеуказанной функции.Например:
async def producer(q):
for i in itertools.count():
await q.put(i)
await asyncio.sleep(.2)
if i == 7:
1/0
async def consumer(q):
while True:
val = await q.get()
print('got', val)
async def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
p = loop.create_task(producer(queue))
c = loop.create_task(consumer(queue))
loop.create_task(cancel_when_done(p, c))
await asyncio.gather(p, c)
asyncio.get_event_loop().run_until_complete(main())