Объедините python asyncio с эмиттером событий pyee - PullRequest
0 голосов
/ 10 июля 2020

Я безуспешно пытаюсь использовать AsyncIOEventEmitter из библиотеки pyee . По какой-то причине переданное событие «Hi» никогда не достигает async_handler для завершения asyncio future. Я также не нашел в Интернете подходящих примеров. Кроме того, я попытался предоставить текущее событие и использовать новое событие l oop для AsyncIOEventEmitter, но оба дают одинаковый результат.

Может ли кто-нибудь мне помочь? Пример модульного теста ниже:

import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter

LOG = logging.getLogger(__name__)

@pytest.mark.asyncio
async def test_setup(event_loop):
    LOG.info("1 - start")
    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())

    # Create a new Future object.
    future_result = event_loop.create_future()
    LOG.info("2 - emit event")
    event_emitter.emit("event", "Hi")

    @event_emitter.on("event")
    async def async_handler(message):
        LOG.info(">>> %s", message)
        future_result.set_result(message)
        return future_result

    # Wait until *future_result* has a result and print it.
    LOG.info(await future_result)

Спасибо!

1 Ответ

1 голос
/ 10 июля 2020

хорошо, разобрался, метод async_handler должен быть определен ранее в тесте ...

Теперь это сработало:

"""Event emitter playground"""
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter

LOG = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_setup(event_loop):
    """Receive event from emitter and complete future!"""
    LOG.info("1 - start")
    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())

    @event_emitter.on("event")
    def async_handler(message):
        LOG.info(">>> %s", message)
        future_result.set_result(message)

    future_result = event_loop.create_future()
    LOG.info("2 - emit event")
    event_emitter.emit("event", "Hi")

    LOG.info(await future_result)
...