Как использовать веб-сокеты Python с asyncio в классе и с существующим циклом событий - PullRequest
1 голос
/ 10 июля 2020

В настоящее время я пытаюсь интегрировать интерфейс веб-сокета в свою программу. Для этой цели я использую модуль https://websockets.readthedocs.io/en/stable/intro.html вместе с asyncio. В настоящее время я изо всех сил пытаюсь реализовать функциональные возможности веб-сокета в специальном классе, который настраивает задачи веб-сокета для работы в параллельном режиме в том же событии l oop, что и задача, выполняемая в классе MyDriver.

main.py

from myDriver import MyDriver
from webSocketServer import WebSocketServer

async def main():

   # loop = asyncio.get_event_loop()
   driver = MyDriver()
   ws = WebSocketServer()
   await driver.drive()
   # The following does not integrate properly with the above. The msgHandler is not ran 
   await websockets.serve(lambda websocket, path: ws.msgHandler(websocket, path), "localhost", 5678)

asyncio.run(main())

Лямбда здесь, чтобы избавиться от аргумента self, исходящего от класса.

webSocketServer.py

import asyncio
import websockets

class WebSocketServer:

    def __init__(self):
        print('Init')

    async def msgHandler(self, websocket, path):
        self.sendTask = asyncio.create_task(self.sendHandler(websocket, path))
        self.receiveTask = asyncio.create_task(self.receiveHandler(websocket, path))

        await asyncio.wait([self.sendTask, self.receiveTask], return_when=asyncio.FIRST_COMPLETED)

    async def sendHandler(self, websocket, path):
        while True:
            await asyncio.sleep(2)
            message = producer()
            await websocket.send(message)

    async def receiveHandler(self, websocket, path):
        async for message in websocket:
            await self.printMsg()

    async def printMsg(self, msg):
        await asyncio.sleep(0.1)
        print(msg)

    def producer():
        return 'Hi !'

Я основывал свою реализацию на приведенных примерах на странице начала работы веб-сокетов. Они используют API loop.run_until_complete(server) и loop.run_forever(). Я также попытался использовать их, передав аргумент loop in конструктору WebSocketServer(loop) и выполнив там websockets.serve(lambda websocket, path: ws.msgHandler(websocket, path), "localhost", 5678), но затем я получил ошибку RuntimeError: This event loop is already running. Я также посмотрел на loop.create_task(), который принимает сопрограмму в качестве аргумента.

Кто-нибудь видит способ, которым я мог бы правильно интегрировать сервер websocket, работающий в том же событии l oop, что и другая моя задача? Спасибо!

1 Ответ

0 голосов
/ 12 июля 2020

Они используют API loop.run_until_complete(server) и loop.run_forever().

Хорошо, что вы преобразовали их в asyncio.run(), что является гораздо более надежным способом запуска asyncio код. Просто у вас преобразование неполное, отсутствует часть run_forever(). Что-то вроде run_forever() необходимо, потому что websockets.serve() не означает «обслужить и завершить работу», это означает «начать обслуживание и вернуть дескриптор серверу». Этот дескриптор возвращается (почти) немедленно, и если вы вернетесь из main в этот момент, как это делает ваш код, программа просто выйдет, прежде чем она обработает одно соединение.

Что вам нужно сделать, так это добавьте еще один await в конец main, где вы ждете столько времени, сколько нужно программе. Один из способов - это await asyncio.Event().wait(), который будет ждать вечно и является эквивалентом await loop.run_forever(). И поскольку в вашем распоряжении есть объект server, вы можете использовать его метод wait_closed(), чтобы ждать, пока сервер работает (т.е. сопрограмма намеренно закрывает его, вызывая server.close()), что, вероятно, именно то, что вам нужно.

Другими словами, вам нужно изменить последнюю строку main() на:

   server = await websockets.serve(lambda websocket, path: ws.msgHandler(websocket, path), "localhost", 5678)
   await server.wait_closed()
...