распараллеливание для цикла в asyncio - PullRequest
2 голосов
/ 04 октября 2019

В настоящее время у меня есть цикл for следующим образом

async def process(tasks, num):
      count = 0
      results = []
      for task in tasks:
           if count >= num:
               break
           result = await some_async_task(task)
           if result == 'foo':
               continue
           results.append(result)
           count+=1

Мне было интересно, могу ли я использовать здесь примитив collect или wait_for. Но я не уверен, как реализовать это, если логика там? Как .. Я не хочу, чтобы ненужное ожидание задачи, если count> = num. Если есть 20 задач и num = 4, я не хочу запускать все 20 задач.

Ответы [ 2 ]

2 голосов
/ 04 октября 2019

Вы можете обрабатывать задачи в пакетах размером, равным количеству результатов, которые вам все еще нужны. Если вы дадите такой пакет asyncio.gather(), он будет запускать их параллельно и сохранять порядок результатов. Например:

async def process(tasks, num):
    results = []
    task_iter = iter(tasks)
    while len(results) < num:
        next_batch = tuple(itertools.islice(task_iter, num - len(results)))
        if len(next_batch) == 0:
            break
        batch_results = await asyncio.gather(*next_batch)
        results.extend(r for r in batch_results if r == 'foo')
1 голос
/ 04 октября 2019

Этого легко достичь с помощью библиотеки aiostream . Вот рабочий пример:

import asyncio
from random import random
from aiostream import stream, pipe


async def some_async_task(i):
    await asyncio.sleep(random())
    return i if random() < 0.2 else None


async def process(task_args, n):
    return await (
        stream.iterate(task_args)
        | pipe.map(some_async_task, task_limit=n)
        | pipe.filter(bool)
        | pipe.take(n)
        | pipe.list()
    )


async def main():
    print(await process(task_args=range(100), n=10))


if __name__ == "__main__":
    asyncio.run(main())

Программа печатает список первых 10 успешно выполненных задач:

[1, 8, 16, 18, 19, 37, 42, 43, 45, 47]

Также обратите внимание, что вы можете настроить число some_async_task, котороеможет выполняться одновременно с использованием аргумента task_limit.

Отказ от ответственности: я сопровождающий проекта.

...