Обработка задач в пакетах в asyncio - PullRequest
0 голосов
/ 03 марта 2019

У меня есть функция, которая генерирует задачи (задачи, связанные с io):

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)

И я пытаюсь написать в asyncio потребителя, который будет одновременно обрабатывать до 10 задач, и одна задачазакончил потом возьму новый.Я не уверен, должен ли я использовать семафоры или есть какой-нибудь исполнитель asycio pool?Я начал писать псевдокод с потоками:

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()

Может ли кто-нибудь мне помочь?Я понятия не имею, где поставить ключевые слова async / await

Ответы [ 3 ]

0 голосов
/ 04 марта 2019

Простая крышка задачи с использованием asyncio.Sepmaphore

async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))

Проблема этого решения заключается в том, что задачи жадно выводятся из генератора.Например, если генератор считывает данные из большой базы данных, программе может не хватить памяти.

Кроме этого, она идиоматична и хорошо себя ведет.

Решение, использующее asyncпротокол генератора для извлечения новых задач по требованию:

async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)

Это может считаться неоптимальным, поскольку он не начинает выполнять задачи, пока не станут доступны 10.

А вотсжатое и волшебное решение, использующее рабочий шаблон :

async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])

Оно опирается на несколько нелогичное свойство иметь возможность иметь несколько асинхронных итераторов для одного и того же асинхронного генератора, и в этом случае каждыйсгенерированный элемент виден только одним итератором.

В моей интуиции сказано, что ни одно из этих решений не работает должным образом при отмене .

0 голосов
/ 05 марта 2019

Как указывает Дима Тисмек , использование семафоров для ограничения параллелизма уязвимо для слишком быстрого истощения task_generator, поскольку нет никакого противодействия между получением задач и отправкой их в цикл событий.Лучшим вариантом, также рассмотренным в другом ответе, является не создание задачи, как только генератор произвел предмет, а создание фиксированного числа рабочих, которые одновременно истощают генератор.

Есть дваобласти, где код может быть улучшен:

  • нет необходимости в семафоре - это излишне, когда число задач для начала фиксировано;
  • обработка отмены сгенерированных задачи задачи регулирования.

Вот реализация, которая решает обе проблемы:

async def throttle(task_generator, max_tasks):
    it = task_generator.__aiter__()
    cancelled = False
    async def worker():
        async for task in it:
            try:
                await task
            except asyncio.CancelledError:
                # If a generated task is canceled, let its worker
                # proceed with other tasks - except if it's the
                # outer coroutine that is cancelling us.
                if cancelled:
                    raise
            # other exceptions are propagated to the caller
    worker_tasks = [asyncio.create_task(worker())
                    for i in range(max_tasks)]
    try:
        await asyncio.gather(*worker_tasks)
    except:
        # In case of exception in one worker, or in case we're
        # being cancelled, cancel all workers and propagate the
        # exception.
        cancelled = True
        for t in worker_tasks:
            t.cancel()
        raise

Простой тестовый пример:

async def mock_task(num):
    print('running', num)
    await asyncio.sleep(random.uniform(1, 5))
    print('done', num)

async def mock_gen():
    tnum = 0
    while True:
        await asyncio.sleep(.1 * random.random())
        print('generating', tnum)
        yield asyncio.create_task(mock_task(tnum))
        tnum += 1

if __name__ == '__main__':
    asyncio.run(throttle(mock_gen(), 3))
0 голосов
/ 03 марта 2019

Async не является потоками.Например, если у вас есть задачи, связанные с файловым вводом-выводом, запишите их асинхронно, используя aiofiles

async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()

Затем замените задачу вашими задачами.Если вы хотите запускать только 10 за один раз, ожидайте asyncio.gather каждые 10 задач.

import asyncio

async def task(x):
  await asyncio.sleep(0.5)
  print( x, "is done" )

async def run(loop):
  futs = []
  for x in range(50):
    futs.append( task(x) )

  await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

Если вы не можете написать асинхронные задачи и нуждаетесь в потоках, это базовый пример с использованием ThreadPoolExecutor asyncio.Обратите внимание, что при max_workers = 5 одновременно выполняется только 5 задач.

import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking(x):
  time.sleep(1)
  print( x, "is done" )

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(15):
    future = loop.run_in_executor(executor, blocking, x)
    futs.append( future )

  await asyncio.sleep(4)
  res = await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()
...