Я безуспешно пытаюсь использовать AsyncIOEventEmitter
из библиотеки pyee . По какой-то причине переданное событие «Hi» никогда не достигает async_handler
для завершения asyncio future. Я также не нашел в Интернете подходящих примеров. Кроме того, я попытался предоставить текущее событие и использовать новое событие l oop для AsyncIOEventEmitter
, но оба дают одинаковый результат.
Может ли кто-нибудь мне помочь? Пример модульного теста ниже:
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
# Create a new Future object.
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
@event_emitter.on("event")
async def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
return future_result
# Wait until *future_result* has a result and print it.
LOG.info(await future_result)
Спасибо!