Как связать низкоуровневый код на основе обратного вызова с высокоуровневым асинхронным / ожидающим кодом? - PullRequest
0 голосов
/ 13 февраля 2019

Документация asyncio.Futures гласит:

Будущие объекты используются для связывания низкоуровневого кода на основе обратного вызова с высокоуровневым асинхронным / ожидающим кодом.

Есть ли канонический пример того, как это делается?


Чтобы сделать пример более конкретным, предположим, что мы хотим обернуть следующую функцию, которая является типичной для основанной на обратном вызовеAPI.Чтобы было ясно: эта функция не может быть изменена - представьте, что это какая-то сложная сторонняя библиотека (которая, вероятно, использует внутренние потоки, которые мы не можем контролировать), которая хочет обратный вызов.

import threading
import time

def callback_after_delay(secs, callback, *args):
    """Call callback(*args) after sleeping for secs seconds"""
    def _target():
        time.sleep(secs)
        callback(*args)

    thread = threading.Thread(target=_target)
    thread.start()

Мы хотели бы бытьвозможность использовать нашу функцию оболочки как:

async def main():
    await aio_callback_after_delay(10., print, "Hello, World")

Ответы [ 2 ]

0 голосов
/ 13 февраля 2019

Просто используйте ThreadPoolExecutor.Код не меняется, за исключением того, как вы запускаете поток.Если вы удалите «return_exceptions» из вызова сбора, вы увидите исключение с полным отпечатком трассировки, так что вам решать, что вы хотите.

import time,random
from concurrent.futures import ThreadPoolExecutor
import asyncio

def cb():
  print("cb called")

def blocking():
  if random.randint(0,3) == 1:
    raise ValueError("Random Exception!")
  time.sleep(1)
  cb()
  return 5

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(5):
    future = loop.run_in_executor(executor, blocking)
    futs.append( future )

  res = await asyncio.gather( *futs, return_exceptions=True )
  for r in res:
    if isinstance(r, Exception):
      print("Exception:",r)

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

Выход

cb called
cb called
cb called
Exception: Random Exception!
Exception: Random Exception!
0 голосов
/ 13 февраля 2019

Ниже приведен полный автономный пример, демонстрирующий один подход.Он заботится о запуске обратного вызова в потоке asyncio и обрабатывает исключения, вызванные обратным вызовом.

Работает в python 3.6.6.Интересно об использовании asyncio.get_event_loop() здесь.Нам нужен цикл, так как loop.create_future() является предпочтительным способом создания фьючерсов в asyncio.Однако в 3.7 мы бы предпочли asyncio.get_running_loop(), что вызвало бы исключение, если цикл еще не был установлен.Возможно, лучший подход - это явно передать цикл в aio_callback_after_delay, но это не соответствует существующему коду asyncio, который часто делает цикл необязательным аргументом ключевого слова.Разъяснения по этому вопросу или любые другие улучшения будут приветствоваться!


import asyncio
import threading
import time


# This is the callback code we are trying to bridge

def callback_after_delay(secs, callback, *args):
    """Call callback(*args) after sleeping for secs seconds"""
    def _target():
        time.sleep(secs)
        callback(*args)

    thread = threading.Thread(target=_target)
    thread.start()


# This is our wrapper

async def aio_callback_after_delay(secs, callback, *args):
    loop = asyncio.get_event_loop()
    f = loop.create_future()

    def _inner():
        try:
            f.set_result(callback(*args))
        except Exception as ex:
            f.set_exception(ex)

    callback_after_delay(secs, loop.call_soon_threadsafe, _inner)
    return await f


#
# Below is test code to demonstrate things work
#

async def test_aio_callback_after_delay():
    print('Before!')
    await aio_callback_after_delay(1., print, "Hello, World!")
    print('After!')



async def test_aio_callback_after_delay_exception():

    def callback():
        raise RuntimeError()

    print('Before!')
    await aio_callback_after_delay(1., callback)
    print('After!')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Basic test
    print('Basic Test')
    loop.run_until_complete(test_aio_callback_after_delay())

    # Test our implementation is truly async
    print('Truly Async!')
    loop.run_until_complete(
        asyncio.gather(
            *(test_aio_callback_after_delay() for i in range(0,5))
        )
    )

    # Test exception handling
    print('Exception Handling')
    loop.run_until_complete(test_aio_callback_after_delay_exception())

Вывод будет выглядеть примерно так:

Basic Test
Before!
Hello, World
After!

Truly Async!
Before!
Before!
Before!
Before!
Before!
Hello, World
Hello, World
Hello, World
Hello, World
Hello, World
After!
After!
After!
After!
After!

Exception Handling
Before!
Traceback (most recent call last):
  File "./scratch.py", line 60, in <module>
    loop.run_until_complete(test_aio_callback_after_delay_exception())
  File "\lib\asyncio\base_events.py", line 468, in run_until_complete
    return future.result()
  File "./scratch.py", line 40, in test_aio_callback_after_delay_exception
    await aio_callback_after_delay(1., callback)
  File "./scratch.py", line 26, in aio_callback_after_delay
    return await f
  File "./scratch.py", line 21, in _inner
    f.set_result(callback(*args))
  File "./scratch.py", line 37, in callback
    raise RuntimeError()
RuntimeError
...