Ниже приведен полный автономный пример, демонстрирующий один подход.Он заботится о запуске обратного вызова в потоке asyncio и обрабатывает исключения, вызванные обратным вызовом.
Работает в python 3.6.6.Интересно об использовании asyncio.get_event_loop()
здесь.Нам нужен цикл, так как loop.create_future()
является предпочтительным способом создания фьючерсов в asyncio.Однако в 3.7 мы бы предпочли asyncio.get_running_loop()
, что вызвало бы исключение, если цикл еще не был установлен.Возможно, лучший подход - это явно передать цикл в aio_callback_after_delay
, но это не соответствует существующему коду asyncio
, который часто делает цикл необязательным аргументом ключевого слова.Разъяснения по этому вопросу или любые другие улучшения будут приветствоваться!
import asyncio
import threading
import time
# This is the callback code we are trying to bridge
def callback_after_delay(secs, callback, *args):
"""Call callback(*args) after sleeping for secs seconds"""
def _target():
time.sleep(secs)
callback(*args)
thread = threading.Thread(target=_target)
thread.start()
# This is our wrapper
async def aio_callback_after_delay(secs, callback, *args):
loop = asyncio.get_event_loop()
f = loop.create_future()
def _inner():
try:
f.set_result(callback(*args))
except Exception as ex:
f.set_exception(ex)
callback_after_delay(secs, loop.call_soon_threadsafe, _inner)
return await f
#
# Below is test code to demonstrate things work
#
async def test_aio_callback_after_delay():
print('Before!')
await aio_callback_after_delay(1., print, "Hello, World!")
print('After!')
async def test_aio_callback_after_delay_exception():
def callback():
raise RuntimeError()
print('Before!')
await aio_callback_after_delay(1., callback)
print('After!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Basic test
print('Basic Test')
loop.run_until_complete(test_aio_callback_after_delay())
# Test our implementation is truly async
print('Truly Async!')
loop.run_until_complete(
asyncio.gather(
*(test_aio_callback_after_delay() for i in range(0,5))
)
)
# Test exception handling
print('Exception Handling')
loop.run_until_complete(test_aio_callback_after_delay_exception())
Вывод будет выглядеть примерно так:
Basic Test
Before!
Hello, World
After!
Truly Async!
Before!
Before!
Before!
Before!
Before!
Hello, World
Hello, World
Hello, World
Hello, World
Hello, World
After!
After!
After!
After!
After!
Exception Handling
Before!
Traceback (most recent call last):
File "./scratch.py", line 60, in <module>
loop.run_until_complete(test_aio_callback_after_delay_exception())
File "\lib\asyncio\base_events.py", line 468, in run_until_complete
return future.result()
File "./scratch.py", line 40, in test_aio_callback_after_delay_exception
await aio_callback_after_delay(1., callback)
File "./scratch.py", line 26, in aio_callback_after_delay
return await f
File "./scratch.py", line 21, in _inner
f.set_result(callback(*args))
File "./scratch.py", line 37, in callback
raise RuntimeError()
RuntimeError