Asyncio взаимодействует с событием l oop после его запуска - PullRequest
1 голос
/ 02 марта 2020

У меня есть класс, представляющий соединение с удаленным сервером.

class Remote:

    def __init__(self) -> None:
        self._writer, self._reader = None, None
        self._queue = Queue()

    async def start(self) -> None:
        self._reader, self._writer = await asyncio.open_connection(
            ...some host, ...some port)

        while True:
            reading = asyncio.create_task(self.recv())
            writing = asyncio.create_task(self.send())
            await asyncio.gather(reading, writing)

    async def stop(self) -> None:
        self._writer.close()
        await self._writer.wait_closed()

    async def recv(self) -> None:
        data = await self._reader.read(1024)
        print('Recieved:', data)

    async def send(self) -> None:
        if not self._queue.empty():
            message = self._queue.get()
            print('Sending:', message)
            self._writer.write(message)
            await self._writer.drain()

    def queue_message(self, code: int,
                      **kwargs: Dict[str, Union[str, int]]) -> None:
        # Do stuff and return a byte representation of a protocol message.
        message = ServerMessage.write_message(code, **kwargs)
        self._queue.put(message)

Я хочу запустить сопрограмму start () и поместить сообщения в очередь (см. Метод queue_message) впоследствии. Как я могу это сделать? Я, вероятно, могу использовать темы для этого, но какие еще есть варианты? Может быть, я просто не понимаю концепцию asyncio.

Вот пример, который я хочу сделать.

client = Remote()
asyncio.run(client.start())
client.queue_message(1, username='...', password='...')
asyncio.sleep(5.0)  # Do some other stuff.
client.queue_message(2, listen_port='...')
asyncio.run(client.stop())

Это очень просто реализовать с помощью потоков, но я хочу исследовать некоторые другие варианты .

1 Ответ

0 голосов
/ 03 марта 2020

Код, который вы предусмотрели, не будет работать, потому что asyncio.run создает новое событие l oop при каждом вызове, и Queue s (и, возможно, другие объекты асинхронной синхронизации) нельзя использовать в циклах событий. Вы должны сделать что-то вроде:

async def my_app():
    client = Remote()
    await client.start()
    client.queue_message(1, username='...', password='...')
    await asyncio.sleep(5.0)  # Do some other stuff.
    client.queue_message(2, listen_port='...')
    await client.stop()

asyncio.run(my_app())

Также обратите внимание, что вы должны изменить self._queue.put на self._queue.put_nowait или, альтернативно, await и сделать функцию asyn c.

...