Python 3: Как отправить асинхронную функцию в ThreadPool? - PullRequest
0 голосов
/ 19 февраля 2019

Я хочу использовать как ThreadPoolExecutor из concurrent.futures, так и асинхронные функции.

Моя программа неоднократно отправляет функцию с различными входными значениями в пул потоков.Окончательная последовательность задач, которые выполняются в этой более крупной функции, может быть в любом порядке, и меня не волнует возвращаемое значение, просто они выполняются в какой-то момент в будущем.

Поэтому я попыталсясделать это

async def startLoop():

    while 1:
        for item in clients:
            arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

        wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

, где переданная функция:

async def threadWork(obj):
   bool = do_something() # needs to execute before next functions
   if bool:
       do_a() # can be executed at any time
       do_b() # ^

, где do_b и do_a - асинхронные функции. Проблема в том, что я получаю ошибку: TypeError: object Future can't be used in 'await' expression и если я удаляю await, я получаю еще одну ошибку, говоря, что мне нужно добавить await.

Я думаю, я мог бы заставить все использовать потоки, но я действительно не хочу этого делать.

1 Ответ

0 голосов
/ 19 февраля 2019

Я рекомендую внимательно прочитать руководство по разработке Asyncio на Python 3 , в частности, раздел «Параллельность и многопоточность».

Основная концептуальная проблема в вашем примере - однопоточные циклы событий, поэтому не имеет смысла выполнять асинхронную сопрограмму в пуле потоков.Есть несколько способов взаимодействия циклов событий и потоков:

  • Цикл событий на поток.Например:

    async def threadWorkAsync(obj):
        b = do_something()
        if b:
            # Run a and b as concurrent tasks
            task_a = asyncio.create_task(do_a())
            task_b = asyncio.create_task(do_b())
            await task_a
            await task_b
    
    def threadWork(obj):
        # Create run loop for this thread and block until completion
        asyncio.run(threadWorkAsync())
    
    def startLoop():
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(config.threadPool.submit(threadWork, item))
    
            wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    
  • Выполнить код блокировки в исполнителе.Это позволяет вам использовать асинхронные фьючерсы вместо одновременных фьючерсов, как указано выше.

    async def startLoop():
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(asyncio.run_in_executor(
                    config.threadPool, threadWork, item))
    
            await asyncio.gather(*arrayOfFutures)
    
  • Использование потоковых функций для отправки задач в циклы событий между потоками.Например, вместо создания цикла выполнения для каждого потока вы можете запустить все асинхронные сопрограммы в цикле выполнения основного потока:

    def threadWork(obj, loop):
        b = do_something()
        if b:
            future_a = asyncio.run_coroutine_threadsafe(do_a())
            future_b = asyncio.run_coroutine_threadsafe(do_b())
            concurrent.futures.wait([future_a, future_b])
    
    async def startLoop():
        loop = asyncio.get_running_loop()
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(asyncio.run_in_executor(
                    config.threadPool, threadWork, item, loop))
    
            await asyncio.gather(*arrayOfFutures)
    

    Примечание: это довольно запутанно, поэтому я не рекомендую его, но явключил это для полноты.

...