Размер кусков или время ожидания Asyncio Python - PullRequest
0 голосов
/ 06 сентября 2018

Я пытаюсь найти простой способ снятия стека с очереди, получая ее порции, указав chunk_size и timeout .

Например, я хочу, чтобы функция get_chunks возвращала либо список chunk_size элементов, если требуется меньше timeout , чтобы получить их, иначе список длины между 9 и chunk_size .

Вот код на данный момент:

import asyncio

async def populate(queue):
    for i in range(0, 100):
        await queue.put(i)

async def _get_chunks(queue, chunk_size):
    items = []
    for i in range(0, chunk_size):
        items.append(await queue.get())
        await asyncio.sleep(0.2)
    return items

async def get_chunks(queue, chunk_size, timeout):
    while True:
        yield _get_chunks(queue, chunk_size)

async def listen():
    queue = asyncio.Queue()
    await populate(queue)
    print(f'{queue.qsize()} items in queue')
    async for chunk in get_chunks(queue, 10, 1):
        print(await chunk)

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(listen())

if __name__ == '__main__':
    main()  

Я думаю, что есть способ сделать это, используя asyncio.wait такой, что:

done, not_done = asyncio.wait([_get_chunks(queue, size),
                               asyncio.sleep(timeout)],
                              return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()

но мне не удается получить результат, когда asyncio.sleep возвращается первым.

1 Ответ

0 голосов
/ 06 сентября 2018

Вы не можете получить результат, потому что _get_chunks еще не закончен. Простой обходной путь - иметь некоторое общее состояние между _get_chunks и его вызывающей стороной:

async def _get_chunks(queue, chunk_size, out):
    for i in range(0, chunk_size):
        out.append(await queue.get())
        await asyncio.sleep(0.2)

Затем вы можете реализовать тайм-аут, используя wait_for, который автоматически отменит сопрограмму тайм-аута:

items = []
try:
    asyncio.wait_for(_get_chunk(queue, size, items))
except asyncio.TimeoutError:
    pass
# items now contains the elements _get_chunk managed to extract
# from the queue within the alotted time
...