Неблокирующий запуск параллельных сопрограмм в Python - PullRequest
0 голосов
/ 18 января 2019

Я хочу выполнять задачи асинхронно и одновременно. Если task1 работает, когда приходит task2, task2 запускается сразу, без ожидания завершения task2. Также я хотел бы избежать обратных вызовов с помощью сопрограмм.

Вот параллельное решение с обратными вызовами:

def fibonacci(n):
    if n <= 1:
        return 1
    return fibonacci(n - 1) + fibonacci(n - 2)


class FibonacciCalculatorFuture:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)

    @staticmethod
    def calculate(n):
        print(f"started n={n}")
        return fibonacci(n)

    def run(self, n):
        future = self.pool.submit(self.calculate, n)
        future.add_done_callback(lambda f: print(f.result()))


if __name__ == '__main__':
    calculator = FibonacciCalculatorFuture()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")

Его выход:

started n=35
started n=32
initial thread can continue its work
3524578
14930352

А вот мое усилие избавиться от обратных вызовов:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.ensure_future(self.calculate(n))


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.run(35)
    calculator.run(32)
    calculator.loop.run_forever()
    print("initial thread can continue its work")

Выход:

started n=35
started n=32
3524578
14930352

В этом случае начальный поток не сможет идти дальше, чем loop.run_forever() и, следовательно, не сможет принимать новые задачи.

Итак, вот мой вопрос : есть ли способ одновременно:

  • выполнять задачи одновременно;
  • иметь возможность принимать новые задачи и планировать их выполнение сразу же (вместе с уже запущенными тактами);
  • использовать сопрограммы и код без обратных вызовов.

Ответы [ 2 ]

0 голосов
/ 18 января 2019

Второй пункт вашего вопроса можно получить, запустив asyncio в отдельной теме и используя asyncio.run_coroutine_threadsafe для планирования сопрограмм. Например:

class FibonacciCalculatorAsync:
    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)

    def start_loop(self):
        thr = threading.Thread(target=self.loop.run_forever)
        thr.daemon = True
        thr.start()


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.start_loop()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")
    calculator.run(10)
    time.sleep(1)
0 голосов
/ 18 января 2019

loop.run_forever() действительно будет работать вечно, даже если внутри нет задач. Хорошей новостью является то, что вам не нужна эта функция. Чтобы дождаться завершения вычислений, используйте asyncio.gather:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        # self.loop = asyncio.get_event_loop()

    ...

    async def calculate(self, n):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)


async def main():
    calculator = FibonacciCalculatorAsync()
    fib_35 = asyncio.ensure_future(calculator.run(35))
    fib_32 = asyncio.ensure_future(calculator.run(32))

    print("initial thread can continue its work")
    ...

    # demand fibonaccy computation has ended
    await asyncio.gather(fib_35, fib_32)


if __name__ == '__main__':
    asyncio.run(main())

Обратите внимание, как здесь обрабатывается цикл - я изменил несколько вещей. Если вы начнете использовать asyncio, я бы порекомендовал иметь один цикл для всех вещей вместо того, чтобы создавать циклы для более детальной задачи. При таком подходе вы получаете все asyncio навороты для обработки и синхронизации задач.

Кроме того, из-за GIL невозможно распараллелить чистый код без ввода-вывода Python в ThreadPoolExecutor. Помните об этом и в таких случаях предпочитайте исполнителя пула процессов.

...