Сервер Python Autobahn Websocket получает по одному соединению за раз - PullRequest
0 голосов
/ 21 ноября 2018

Python Asyncio или Twisted, используемые Autobahn, должны обрабатывать одновременное соединение одновременно.Я следовал хорошему руководству по автобану read-the-doc, все работало хорошо, но сервер получает только одно соединение и обрабатывает свой запрос, а затем принимает второе.

Как я могу убедиться, чтосервер получает несколько соединений одновременно, не удерживая другого подключающегося партнера?

Я искал в Интернете весь день, но мой код не принес успеха (я отсеял много кода во время отладки)

from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory

class NMmapperServerProtocol(WebSocketServerProtocol):
    cmd = NMmapperWSCommandParser() # I have cut out this due to debugging

    def onMessage(self, payload, isBinary):
        """
        @payload the message
        @isBinary whether it's a binary message
        """
        try:
            offload_payload = json.loads(payload.decode("utf-8"))
            await asyncio.gather(cmd.processWSCommands(offload_payload, self))
        except Exception as e:
            raise

    def onConnect(self, request):
        """
        When we've got a peer connect to our server
        """
        try:
            #print(self)
            print(request.peer, "Has connected")
        except Exception as e:
            raise

    def onOpen(self):
        """
        We have a fully connection
        """
        try:
            # Some database action can be made from here
            print("Connection now opened")
        except Exception as e:
            raise

    def onClose(self, wasClean, code, reason):
        """
        @ the client is closing his or her
        connection
        """
        try:
            print("wasClean ", wasClean)
            print("code ", code)
            print("reason ", reason)
        except Exception as e:
            raise

    # Setters
    def setCsrftoken(self, cookie_string):
        """
        @ parse an set
        """
        self.csrftoken = self.parse_csrftoken(cookie_string)

    # Setters
    def setSession(self, cookie_string):
        """
        @ parse an set
        """
        self.session = self.parse_session(cookie_string)


if __name__=="__main__":
    if(IN_PRODUCTION):
        print("RUNNING ")
        factory = NMmapperWSServerFactory(PRODUCTION_HOST, PRODUCTION_PORT)
        factory.run_loop()
    else:
        print("Running on dev")
        factory = WebSocketServerFactory()
        factory.protocol = NMmapperServerProtocol

        loop = asyncio.get_event_loop()
        coro = loop.create_server(factory, '0.0.0.0', 9000)
        server = loop.run_until_complete(coro)

        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            server.close()
            loop.close()

Спасибо.

1 Ответ

0 голосов
/ 22 ноября 2018

Я наконец-то заработал, как и ожидалось.Будучи библиотекой asyncio, мне приходилось ставить префикс async для каждого метода, выполняющего долгосрочную задачу

. Проблема была в onMessage, мне приходилось обрабатывать сообщения параллельно, чтобы не блокировать других клиентов, желающих обрабатывать там сообщения.,для этого мне пришлось

offload_payload = json.loads(payload.decode("utf-8"))
loop = asyncio.get_event_loop()

# Offload command processing
loop.create_task(self.processWSCommands(offload_payload, self))

. Таким образом, каждое сообщение обрабатывается параллельно. Даже в таком случае убедитесь, что метод или функция, обрабатывающая сообщение, не блокируются.

from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory

class NMmapperServerProtocol(WebSocketServerProtocol):
    cmd = NMmapperWSCommandParser() # I have cut out this due to debugging

    async def onMessage(self, payload, isBinary):
        """
        @payload the message
        @isBinary whether it's a binary message
        """
        try:
            offload_payload = json.loads(payload.decode("utf-8"))
            loop = asyncio.get_event_loop()
            #loop.create_task(runner(10, self.peer))
            #asyncio.gather(runner(20, self.peer))

            # Offload command processing
            loop.create_task(self.processWSCommands(offload_payload, self))
        except Exception as e:
            raise

    def onConnect(self, request):
        """
        When we've got a peer connect to our server
        """
        try:
            #print(self)
            print(request.peer, "Has connected")
        except Exception as e:
            raise

    def onOpen(self):
        """
        We have a fully connection
        """
        try:
            # Some database action can be made from here
            print("Connection now opened")
        except Exception as e:
            raise

    def onClose(self, wasClean, code, reason):
        """
        @ the client is closing his or her
        connection
        """
        try:
            print("wasClean ", wasClean)
            print("code ", code)
            print("reason ", reason)
        except Exception as e:
            raise

    # Setters
    def setCsrftoken(self, cookie_string):
        """
        @ parse an set
        """
        self.csrftoken = self.parse_csrftoken(cookie_string)

    # Setters
    def setSession(self, cookie_string):
        """
        @ parse an set
        """
        self.session = self.parse_session(cookie_string)


if __name__=="__main__":
    if(IN_PRODUCTION):
        print("RUNNING ")
        factory = NMmapperWSServerFactory(PRODUCTION_HOST, PRODUCTION_PORT)
        factory.run_loop()
    else:
        print("Running on dev")
        factory = WebSocketServerFactory()
        factory.protocol = NMmapperServerProtocol

        loop = asyncio.get_event_loop()
        coro = loop.create_server(factory, '0.0.0.0', 9000)
        server = loop.run_until_complete(coro)

        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            server.close()
            loop.close()
...