Немедленно выполнить асинхронный обратный вызов из синхронного кода - PullRequest
4 голосов
/ 23 мая 2019

Проблема

У меня есть библиотека, которая в настоящее время не имеет поддержки асинхронности и должна вызываться из асинхронного кода.Асинхронный код вызывает библиотеку через обработчик (функция handler в коде ниже).В то время как обработчик выполняется, библиотека периодически вызывает обратный вызов (callback_wrapper), чтобы сообщить о прогрессе.

Синхронный обработчик выполняется в ThreadPoolExecutor, чтобы основной цикл событий мог обрабатывать дальнейшие событияв то время как обработчик работает.

В результате синхронный обратный вызов выполняется немедленно, а асинхронный обратный вызов выполняется только после выполнения основного обработчика.Желаемый результат - асинхронные обратные вызовы, которые должны быть выполнены немедленно.

Я предполагаю, что цикл обработки событий блокируется при вызове run_in_executor, но я не уверен, как решить эту проблему.

Код

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.ensure_future(callback(), loop=loop)
    print('callback wrapper stopped')


async def main():
    handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

Выход

handler started
callback wrapper started
callback wrapper stopped
handler stopped
callback

Требуемый выход

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped

1 Ответ

1 голос
/ 24 мая 2019

благодаря вводу @ user4815162342 я нашел следующее решение:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.run_coroutine_threadsafe(callback(), loop).result()
    print('callback wrapper stopped')


async def main():
    await thread_handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

, которое дает желаемый результат:

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped
...