Сопрограммы построены на итераторах - специальный метод __await__
является обычным итератором.Это позволяет вам обернуть основной итератор в еще один итератор.Хитрость заключается в том, что вы должны развернуть итератор своей цели, используя __await__
, затем повторно обернуть свой собственный итератор, используя свой собственный __await__
.
Базовая функциональность, которая работает на экземплярах сопрограмм, выглядит следующим образом:
class CoroWrapper:
"""Wrap ``target`` to have every send issued in a ``context``"""
def __init__(self, target: 'Coroutine', context: 'ContextManager'):
self.target = target
self.context = context
# wrap an iterator for use with 'await'
def __await__(self):
# unwrap the underlying iterator
target_iter = self.target.__await__()
# emulate 'yield from'
iter_send, iter_throw = target_iter.send, target_iter.throw
send, message = iter_send, None
while True:
# communicate with the target coroutine
try:
with self.context:
signal = send(message)
except StopIteration as err:
return err.args[0] if err.args else None
else:
send = iter_send
# communicate with the ambient event loop
try:
message = yield signal
except BaseException as err:
send, message = iter_throw, err
Обратите внимание, что это явно работает на Coroutine
, а не на Awaitable
- Coroutine.__await__
реализует интерфейс генератора.Теоретически, Awaitable
не обязательно обеспечивает __await__().send
или __await__().throw
.
Этого достаточно для передачи и ввода сообщений:
import asyncio
class PrintContext:
def __enter__(self):
print('enter')
def __exit__(self, exc_type, exc_val, exc_tb):
print('exit via', exc_type)
return False
async def main_coro():
print(
'wrapper returned',
await CoroWrapper(test_coro(), PrintContext())
)
async def test_coro(delay=0.5):
await asyncio.sleep(delay)
return 2
asyncio.run(main_coro())
# enter
# exit via None
# enter
# exit <class 'StopIteration'>
# wrapper returned 2
Вы можете делегироватьоберточная часть в отдельный декоратор.Это также гарантирует, что у вас есть фактическая сопрограмма, а не пользовательский класс - некоторые асинхронные библиотеки требуют этого.
from functools import wraps
def send_context(context: 'ContextManager'):
"""Wrap a coroutine to issue every send in a context"""
def coro_wrapper(target: 'Callable[..., Coroutine]') -> 'Callable[..., Coroutine]':
@wraps(target)
async def context_coroutine(*args, **kwargs):
return await CoroWrapper(target(*args, **kwargs), context)
return context_coroutine
return coro_wrapper
Это позволяет вам напрямую декорировать функцию сопрограммы:
@send_context(PrintContext())
async def test_coro(delay=0.5):
await asyncio.sleep(delay)
return 2
print('async run returned:', asyncio.run(test_coro()))
# enter
# exit via None
# enter
# exit via <class 'StopIteration'>
# async run returned: 2