Можно ли ограничить количество сопрограмм, работающих одновременно в асинхронном режиме? - PullRequest
0 голосов
/ 12 мая 2018

Я уже написал свой сценарий, используя asyncio, но обнаружил, что число сопрограмм, запущенных одновременно, слишком велико, и оно часто заканчивается зависанием.

Поэтому я хотел бы одновременно ограничить количество сопрограмм, и как только оно достигнет предела, я хочу дождаться завершения любой сопрограммы, прежде чем будет выполнена другая.

Мой текущий код выглядит примерно так:

loop = asyncio.get_event_loop()
p = map(my_func, players)
result = loop.run_until_complete(asyncio.gather(*p))

async def my_func(player):
    # something done with `await`

players имеет тип list и содержит много элементов (скажем, 12000). Ему нужно так много вычислительных ресурсов, чтобы запускать их все одновременно в asyncio.gather(*p), поэтому я бы предпочел, чтобы количество одновременно запущенных игроков составляло 200. Как только оно достигнет 199, я бы хотел, чтобы другая сопрограмма начала выполняться.

Возможно ли это в asyncio?

Ответы [ 2 ]

0 голосов
/ 14 мая 2018

Возможно, вы захотите использовать aiostream.stream.map с аргументом task_limit:

from aiostream import stream, pipe

async def main():
    xs = stream.iterate(players)
    ys = stream.map(xs, my_func, task_limit=100)
    zs = stream.list(ys)
    results = await zs

Тот же подход с использованием труб:

async def main():
    results = await (
        stream.iterate(players) | 
        pipe.map(my_func, task_limit=100) |
        pipe.list())

Для получения дополнительной информации см. Страницу проекта aiostream и документацию .

Отказ от ответственности: я поддерживаю проект.

0 голосов
/ 12 мая 2018

Я могу предложить использовать asyncio.BoundedSemaphore.

import asyncio

async def my_func(player, asyncio_semaphore):
    async with asyncio_semaphore:
        # do stuff

async def main():
    asyncio_semaphore = asyncio.BoundedSemaphore(200)
    jobs = []
    for i in range(12000):
        jobs.append(asyncio.ensure_future(my_func(players[i], asyncio_semaphore)))
    await asyncio.gather(*jobs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(main())

Таким образом, только 200 одновременных задач могут получить семафор и использовать системные ресурсы, в то время как под рукой 12000 задач.

...