Если свести мою проблему к игрушечному примеру, скажем, у меня есть это:
import asyncio
async def run(gateway):
my_var = 42
response = await gateway.get_response(my_var)
print(f"I got a response: {response}")
class Gateway:
def __init__(self):
self.response = None
self.outer_var = None
async def get_response(self, some_var):
self.outer_var = some_var
while self.response is None:
await asyncio.sleep(1)
response = self.response
self.response = None
return response
И затем я хочу проверить это как
from somewhere import run, Gateway
def test_run():
gateway = Gateway()
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(run(gateway), loop)
assert gateway.outer_var == 42
gateway.response = "hello"
# Then, after less than a second the print should be executed
К тому времени, когда он доберется до диссертация внешнего вар все еще None
. Я не уверен, понятно ли, что я пытаюсь сделать, но это будет что-то вроде запуска run()
в фоновом режиме, и что он останавливается в ожидании ответа, а когда он получает, он может возобновить обычное выполнение. Таким образом, run()
будет декларативным процессом и все же допускает какое-то взаимодействие (связь с основным процессом). То, что я пытаюсь сделать, даже возможно?