Я пытаюсь найти простой способ снятия стека с очереди, получая ее порции, указав chunk_size и timeout .
Например, я хочу, чтобы функция get_chunks
возвращала либо список chunk_size элементов, если требуется меньше timeout , чтобы получить их, иначе список длины между 9 и chunk_size .
Вот код на данный момент:
import asyncio
async def populate(queue):
for i in range(0, 100):
await queue.put(i)
async def _get_chunks(queue, chunk_size):
items = []
for i in range(0, chunk_size):
items.append(await queue.get())
await asyncio.sleep(0.2)
return items
async def get_chunks(queue, chunk_size, timeout):
while True:
yield _get_chunks(queue, chunk_size)
async def listen():
queue = asyncio.Queue()
await populate(queue)
print(f'{queue.qsize()} items in queue')
async for chunk in get_chunks(queue, 10, 1):
print(await chunk)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(listen())
if __name__ == '__main__':
main()
Я думаю, что есть способ сделать это, используя asyncio.wait
такой, что:
done, not_done = asyncio.wait([_get_chunks(queue, size),
asyncio.sleep(timeout)],
return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()
но мне не удается получить результат, когда asyncio.sleep
возвращается первым.