Объединение asyncio с несколькими рабочими ProcessPoolExecutor и async - PullRequest
1 голос
/ 05 мая 2019

Мой вопрос очень похож на Объединение asyncio с несколькими рабочими ProcessPoolExecutor - однако небольшое изменение (я считаю, что async for) делает отличные ответы там непригодными для меня.

Я пробую следующее MWE:

import concurrent.futures
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def non_blocking(loop):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        async for i in mygen():
            print('MASTER: Sending to executor blocking task number ' + str(i+1))
            result = await loop.run_in_executor(executor, blocking, i)
            print(result)
            print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))

Выход из этого, как и ожидалось, не асинхронный:

MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2 
EXECUTOR: Completed blocking task number 2 
MASTER: Well done executor - you seem to have completed blocking task number 2

Я хотел бы настроить код так, чтобы задачи выполнялись в двух параллельных процессах и выводили выходные данные по мере их появления. Желаемый результат:

MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2

Я понимаю из Объединение asyncio с несколькими рабочими ProcessPoolExecutor , что при моем нынешнем синтаксисе await loop.run_in_executor() блокирует. Я не знаю, как заменить его таким образом, чтобы async for мог перейти к следующему сгенерированному значению, ожидая, пока исполнитель завершит свою работу. Обратите внимание, я не использую asyncio.gather, как в их примере.

1 Ответ

0 голосов
/ 05 мая 2019

Если вы хотите, чтобы ваши задачи выполнялись максимум двумя процессами, самый простой способ добиться этого - создать исполнителя с max_workers=2.Затем вы можете отправлять задачи как можно быстрее, т.е. переходите к следующей итерации async for, не дожидаясь завершения предыдущей задачи.Вы можете собрать результаты всех задач в конце, чтобы исключения не остались незамеченными (и, возможно, получить фактические результаты).

Следующий код дает ожидаемый результат:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import time

async def mygen(u: int = 2):
    i = 0
    while i < u:
        yield i
        i += 1

def blocking(delay):
    time.sleep(delay+1)
    return('EXECUTOR: Completed blocking task number ' + str(delay+1))

async def run_blocking(executor, task_no, delay):
    print('MASTER: Sending to executor blocking task number '
          + str(task_no))
    result = await loop.run_in_executor(executor, blocking, delay)
    print(result)
    print('MASTER: Well done executor - you seem to have completed '
          'blocking task number ' + str(task_no))

async def non_blocking(loop):
    tasks = []
    with ProcessPoolExecutor(max_workers=2) as executor:
        async for i in mygen():
            # spawn the task and let it run in the background
            tasks.append(asyncio.create_task(
                run_blocking(executor, i + 1, i)))
        # if there was an exception, retrieve it now
        await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
...