Python и WebSockets - как отправить с сервера на клиент без блокировки? - PullRequest
1 голос
/ 12 марта 2020

Может кто-нибудь объяснить или, еще лучше, привести рабочий пример того, как написать Python клиент и сервер, где сервер может инициировать отправку через WebSocket в любое произвольное время на основе обработки logi c и клиента можно получить в любое время? Оба должны быть освобождены для выполнения других операций обработки в качестве цели большинства.

Я много гуглил для этого, в том числе изучал Торнадо , который, кажется, классифицирует c Python option и websockets , и asyncio , которые кажутся более новым способом. В любом случае каждый рабочий пример, который я могу найти, основан на том, что клиент инициирует связь. Некоторые из них охватывают отправку с сервера на клиент в виде эха, когда логика c клиента инициирует серию связи, но мне нужно получить пример работы, где сервер инициирует связь.

Многие Tornado примеры тайм-аут, который не будет работать в моем случае. Все примеры websockets и asyncio представляют собой либо клиент-сервер, и / или имеют строку, подобную somethingOrOther.run(), которая блокирует выполнение на клиенте или сервере, или и то, и другое. Кроме того, во многих примерах один клиент или сервер написан на Python, а другой - на JavaScript (мне нужны оба в Python).

мне не нужен соединение должно быть полнодуплексным, то есть клиент никогда не должен отправлять серверу на тот же сокет, за исключением, возможно, отправки подтверждения.

Этот вопрос кажется чтобы быть похожим, однако, код автора не является полным, нет принятого ответа, и один предоставленный ответ (не принятый) не содержит полный код, и клиент находится в JavaScript (мне нужны оба в Python ).

Вот пример того, что я пытаюсь сделать, используя обычные сокеты:

сервер:

# server.py

import socket
import time
import random

# module-level variables
HOST='127.0.0.1'
PORT=65439

ACK_TEXT = 'ack_text'


def main():

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((HOST, PORT))
    sock.listen()
    conn, addr = sock.accept()

    while True:
        # In an actual program some significant processing/logic would happen here that would take time
        # and also produce a certain result, depending on which we may or may not need to send a message.
        # To simulate this, we'll pause for a short and random amount of time, then get a random integer.
        time.sleep(random.uniform(0.5, 1.0))
        myRandInt = random.randint(0, 10)

        # if the ranom integer is even, send a message
        if myRandInt % 2 == 0:
            message = str(myRandInt)
            print('sending message: ' + message)
            sendTextViaSocket(conn, message)
        # end if
    # end while

# end function

def sendTextViaSocket(conn, message):

    # encode the message
    encodedMessage = bytes(message, 'utf-8')
    # send the encoded message
    conn.sendall(encodedMessage)

    # receive the encoded acknowledgement
    encodedAck = conn.recv(1024)
    # decode the acknowledgement
    ack = encodedAck.decode('utf-8')
    # check the received acknowledgement is correct, if not log an error
    if ack == ACK_TEXT:
        pass
    else:
        print('error: acknowledgement was received as ' + str(ack))
    # end if
# end function

if __name__ == '__main__':
    main()

клиент:

# client

import socket
import select
import time
import random

# module-level variables
HOST='127.0.0.1'
PORT=65439

ACK_TEXT = 'ack_text'


def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # if the client is started 1st connect will crash, so continually try to connect
    # in a try-catch until the server is available
    connectionSuccessful = False
    while not connectionSuccessful:
        try:
            sock.connect((HOST, PORT))
            connectionSuccessful = True
        except:
            pass
        # end try
    # end while

    socks = [ sock ]

    # endlessly receive messages
    while True:

        readySocks, _, _ = select.select(socks, [], [], 0)

        for readySock in readySocks:
            message = receiveTextViaSocket(readySock)
            print('received message = ' + str(message))
        # end for

        # In an actual program client would have other significant tasks to do
        # To simulate this we'll pause for a short but random amount of time
        time.sleep(random.uniform(0.5, 1.0))
    # end while

# end function

def receiveTextViaSocket(sock):
    # receive the encoded message
    encodedMessage = sock.recv(1024)
    # decode the message
    message = encodedMessage.decode('utf-8')

    # now send the acknowledgement
    # encode the acknowledgement
    encodedAck = bytes(ACK_TEXT, 'utf-8')
    # send the encoded acknowledgement
    sock.sendall(encodedAck)

    return message
# end function

if __name__ == '__main__':
    main()

Как я могу сделать то же самое, но с WebSockets ??

--- Edit ---

Потратив большую часть сегодняшнего дня на это, это лучшее, что я сделал, так далеко:

сервер:

# server.py

import threading
import asyncio
import websockets
import collections
import random
import time

sendMessageQueue = collections.deque()

def main():
    webSocketServer = websockets.serve(sendMessages, 'localhost', 8765)
    myLoop = asyncio.get_event_loop()

    webSockThread = threading.Thread(target=webSockStart, args=(webSocketServer, myLoop,))

    webSockThread.start()

    while True:
        myRandInt = random.randint(1, 10)
        print('appending ' + str(myRandInt))
        sendMessageQueue.append(str(myRandInt))
        print('sendMessageQueue = ' + str(sendMessageQueue))
        time.sleep(random.uniform(1.0, 2.0))
    # end while

# end function

def webSockStart(webSocketServer, myLoop):
    myLoop.run_until_complete(webSocketServer)
    myLoop.run_forever()
# end function

async def sendMessages(websocket, path):
    while len(sendMessageQueue) > 0:
        await websocket.send(sendMessageQueue.popleft())
    # end while
# end function

if __name__ == '__main__':
    main()

клиент:

# client.py

import threading
import asyncio
import websockets
import collections
import random
import time

receiveMessageQueue = collections.deque()

def main():
    receiveWebSockThread = threading.Thread(target=receiveWebSockStart)
    receiveWebSockThread.start()

    while True:
        print('doing other stuff')
        time.sleep(1.0)

        while len(receiveMessageQueue) > 0:
            message = receiveMessageQueue.popleft()
            print('message = ' + str(message))
        # end while
    # end while

# end function

def receiveWebSockStart():
    loop = asyncio.new_event_loop()
    loop.run_until_complete(receiveMessages())
    loop.run_forever()
# end function

async def receiveMessages():
    while True:
        uri = 'ws://localhost:8765'
        async with websockets.connect(uri) as webSockConn:
            message = await webSockConn.recv()
            receiveMessageQueue.append(str(message))
            # while True:

            # end while
        # end with
# end function

if __name__ == '__main__':
    main()

выход сервера: * 10 43 *

$ python3 server.py
appending 3
sendMessageQueue = deque(['3'])
appending 8
sendMessageQueue = deque(['8'])
appending 8
sendMessageQueue = deque(['8', '8'])
appending 6
sendMessageQueue = deque(['8', '8', '6'])
appending 1
sendMessageQueue = deque(['8', '8', '6', '1'])

вывод клиента:

$ python3 client.py 
doing other stuff
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "client.py", line 30, in receiveWebSockStart
    loop.run_until_complete(receiveMessages())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "client.py", line 38, in receiveMessages
    message = await webSockConn.recv()
  File "/usr/local/lib/python3.6/dist-packages/websockets/protocol.py", line 509, in recv
    await self.ensure_open()
  File "/usr/local/lib/python3.6/dist-packages/websockets/protocol.py", line 812, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason

message = 3
doing other stuff
doing other stuff
doing other stuff
doing other stuff

Сервер, кажется, работает должным образом, однако клиент отключается после получения одного сообщения, и я не уверен, почему или как его решить , Я не уверен, что потоки - даже правильный подход, или я должен был использовать run_in_executor от concurrent.futures, но я не мог заставить это работать вообще. Любой, у кого есть определенные c знания по этой теме c, пожалуйста, помогите.

--- Edit2 ---

Ну, по крайней мере, я нашел ответ, который работает (проверено с Python 3.6.9 в Ubuntu 18.04), но я действительно недоволен этим:

сервер:

# server.py

import threading
import asyncio
import websockets
import collections
import random
import time

sendMessageQueue = collections.deque()

def main():
    # start the WebSocket sending on a separate thread so it doesn't block main
    webSockSendThread = threading.Thread(target=sendWebSockStart)
    webSockSendThread.start()

    while True:
        # make up a random integer and append it to the send queue
        myRandInt = random.randint(1, 10)
        print('appending ' + str(myRandInt))
        sendMessageQueue.append(str(myRandInt))

        # an actual program would have many other activities to do here, use a random sleep to simulate this
        print('doing other stuff')
        time.sleep(random.uniform(1.0, 2.0))
    # end while

# end function

def sendWebSockStart():
    # since we're in a separate thread now, call new_event_loop() (rather than the usual get_event_loop())
    # and set the returned loop as the current loop
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    # instantiate the WebSocket server, note this also connects it to the sendMessages function
    webSocketServer = websockets.serve(sendMessages, 'localhost', 8765)
    # run the webSocketServer forever, which runs the sendMessages function forever
    loop.run_until_complete(webSocketServer)
    loop.run_forever()      # note execution of this separate thread stays on this line forever
# end function

async def sendMessages(websocket, path):
    while True:
        while len(sendMessageQueue) > 0:
            await websocket.send(sendMessageQueue.popleft())
        # end while
    # end while
# end function

if __name__ == '__main__':
    main()

клиент:

# client.py

import threading
import asyncio
import websockets
import collections
import random
import time

receiveMessageQueue = collections.deque()

def main():
    # start the WebSocket receiving on a separate thread so it doesn't block main
    receiveWebSockThread = threading.Thread(target=receiveWebSockStart)
    receiveWebSockThread.start()

    while True:
        # dequeue and print out all the messages in the receive queue currently
        while len(receiveMessageQueue) > 0:
            message = receiveMessageQueue.popleft()
            print('message = ' + str(message))
        # end while

        # an actual program would have many other activities to do here, use a sleep to simulate this
        print('doing other stuff')
        time.sleep(1.0)
    # end while

# end function

def receiveWebSockStart():
    # since we're in a separate thread now, call new_event_loop() rather than the usual get_event_loop()
    loop = asyncio.new_event_loop()
    # run receiveMessages() forever
    loop.run_until_complete(receiveMessages())
    loop.run_forever()      # note execution of this separate thread stays on this line forever
# end function

async def receiveMessages():
    # set up the connection
    uri = 'ws://localhost:8765'
    async with websockets.connect(uri) as webSockConn:
        # endlessly receive messages and append to the queue when received
        while True:
            message = await webSockConn.recv()
            receiveMessageQueue.append(str(message))
        # end while
    # end with
# end function

if __name__ == '__main__':
    main()

вывод сервера:

$ python3 server.py 
appending 10
doing other stuff
appending 6
doing other stuff
appending 2
doing other stuff
appending 8
doing other stuff
appending 2
doing other stuff
appending 3
doing other stuff
appending 9
doing other stuff

вывод клиента:

$ python3 client.py 
doing other stuff
message = 10
message = 6
doing other stuff
doing other stuff
message = 2
doing other stuff
doing other stuff
message = 8
doing other stuff
doing other stuff
message = 2
doing other stuff
doing other stuff
message = 3
doing other stuff
message = 9
doing other stuff

Я использую threading в качестве посредника между main и asyncio websockets функцией. Кто-нибудь может объяснить или еще лучше предоставить рабочий пример того, как переработать это, чтобы использовать run_in_executor для устранения промежуточного потока ??

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...