Простая крышка задачи с использованием asyncio.Sepmaphore
async def max10(task_generator):
semaphore = asyncio.Semaphore(10)
async def bounded(task):
async with semaphore:
return await task
async for task in task_generator:
asyncio.ensure_future(bounded(task))
Проблема этого решения заключается в том, что задачи жадно выводятся из генератора.Например, если генератор считывает данные из большой базы данных, программе может не хватить памяти.
Кроме этого, она идиоматична и хорошо себя ведет.
Решение, использующее asyncпротокол генератора для извлечения новых задач по требованию:
async def max10(task_generator):
tasks = set()
gen = task_generator.__aiter__()
try:
while True:
while len(tasks) < 10:
tasks.add(await gen.__anext__())
_done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except StopAsyncIteration:
await asyncio.gather(*tasks)
Это может считаться неоптимальным, поскольку он не начинает выполнять задачи, пока не станут доступны 10.
А вотсжатое и волшебное решение, использующее рабочий шаблон :
async def max10(task_generator):
async def worker():
async for task in task_generator:
await task
await asyncio.gather(*[worker() for i in range(10)])
Оно опирается на несколько нелогичное свойство иметь возможность иметь несколько асинхронных итераторов для одного и того же асинхронного генератора, и в этом случае каждыйсгенерированный элемент виден только одним итератором.
В моей интуиции сказано, что ни одно из этих решений не работает должным образом при отмене .