В вашем вопросе отсутствует пара деталей, но если предположить, что something()
является асинхронным итератором или генератором, и вы хотите, чтобы item
было sentinel
каждый раз, когда something
не давало значение в течение тайм-аута, вот реализация timeout()
:
import asyncio
from typing import *
T = TypeVar('T')
# async generator, needs python 3.6
async def timeout(it: AsyncIterator[T], timeo: float, sentinel: T) -> AsyncGenerator[T, None]:
try:
nxt = asyncio.ensure_future(it.__anext__())
while True:
try:
yield await asyncio.wait_for(asyncio.shield(nxt), timeo)
nxt = asyncio.ensure_future(it.__anext__())
except asyncio.TimeoutError:
yield sentinel
except StopAsyncIteration:
pass
finally:
nxt.cancel() # in case we're getting cancelled our self
тест:
async def something():
yield 1
await asyncio.sleep(1.1)
yield 2
await asyncio.sleep(2.1)
yield 3
async def test():
expect = [1, None, 2, None, None, 3]
async for item in timeout(something(), 1, None):
print("Check", item)
assert item == expect.pop(0)
asyncio.get_event_loop().run_until_complete(test())
По истечении wait_for()
задание будет отменено. Поэтому нам нужно обернуть it.__anext__()
в задачу и затем защитить ее, чтобы иметь возможность возобновить итератор.