Подключиться к серверу веб-сокетов из скрипта Python? - PullRequest
0 голосов
/ 04 июля 2019

У меня есть приведенный ниже сервер веб-сокетов, который принимает подключения от веб-клиентов и передает данные всем клиентам, когда определенные клавиши нажимаются на любом из клиентов.В этом секторе все работает отлично.Однако у меня есть два внешних приложения Python, выполняющих различные операции на сервере, и мне также нужен сервер websocket, чтобы реагировать на их действия.Как я могу подключиться к серверу веб-сокетов из этих приложений, или это вообще возможно?

сервер веб-сокетов:

class Control(WebSocket):
    def handle(self):
        if self.data == "Left":
            Return = update.leftArrow()
            ClassValues = read_Class()
            LineValue = read_Line()
            message = "Line " + str(ClassValues[Return[1]][LineValue[Return[1]]])
        elif self.data == "Right":
            Return = update.rightArrow()
            LineValue = read_Line()
            message = "Line " + str(ClassValues[Return[1]][LineValue[Return[1]]])
        elif self.data == "Up":
            Return = update.upArrow()
            message = "Control " + str(Return[1])
        elif self.data == "Down":
            Return = update.downArrow()
            message = "Control " + str(Return[1])
        else:
            Return = [False, 0]
        if Return[0]:
            for client in self.server.connections.itervalues():
                client.sendMessage(message)

server = WebSocketServer('', 8000, Control)
server.serve_forever()

Ответы [ 2 ]

1 голос
/ 04 июля 2019

Вы можете использовать модуль websockets для подключения к вашей веб-розетке. Я думаю, что документация, доступная на этой странице, лучше, чем любое объяснение, которое я мог бы дать здесь. Это также довольно просто и легко понять.

0 голосов
/ 10 июля 2019

Исходя из ввода new-dev-123, у меня есть следующее решение:

import asyncio
import websockets

if TestCondition:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(SendMessage(message))

@asyncio.coroutine
def SendMessage(message):
    websocket = yield from websockets.connect('ws://localhost:8000')
    try:
        yield from websocket.send(message)
    finally:
        yield from websocket.close()

Обратите внимание, что это решение для версий Python меньше чем 3.6.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...