Мой вопрос очень похож на Объединение asyncio с несколькими рабочими ProcessPoolExecutor - однако небольшое изменение (я считаю, что async for
) делает отличные ответы там непригодными для меня.
Я пробую следующее MWE:
import concurrent.futures
import asyncio
import time
async def mygen(u: int = 2):
i = 0
while i < u:
yield i
i += 1
def blocking(delay):
time.sleep(delay+1)
return('EXECUTOR: Completed blocking task number ' + str(delay+1))
async def non_blocking(loop):
with concurrent.futures.ProcessPoolExecutor() as executor:
async for i in mygen():
print('MASTER: Sending to executor blocking task number ' + str(i+1))
result = await loop.run_in_executor(executor, blocking, i)
print(result)
print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))
loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
Выход из этого, как и ожидалось, не асинхронный:
MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
Я хотел бы настроить код так, чтобы задачи выполнялись в двух параллельных процессах и выводили выходные данные по мере их появления. Желаемый результат:
MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
Я понимаю из Объединение asyncio с несколькими рабочими ProcessPoolExecutor , что при моем нынешнем синтаксисе await loop.run_in_executor()
блокирует. Я не знаю, как заменить его таким образом, чтобы async for
мог перейти к следующему сгенерированному значению, ожидая, пока исполнитель завершит свою работу. Обратите внимание, я не использую asyncio.gather
, как в их примере.