Может кто-нибудь объяснить или, еще лучше, привести рабочий пример того, как написать Python клиент и сервер, где сервер может инициировать отправку через WebSocket в любое произвольное время на основе обработки logi c и клиента можно получить в любое время? Оба должны быть освобождены для выполнения других операций обработки в качестве цели большинства.
Я много гуглил для этого, в том числе изучал Торнадо , который, кажется, классифицирует c Python option и websockets , и asyncio , которые кажутся более новым способом. В любом случае каждый рабочий пример, который я могу найти, основан на том, что клиент инициирует связь. Некоторые из них охватывают отправку с сервера на клиент в виде эха, когда логика c клиента инициирует серию связи, но мне нужно получить пример работы, где сервер инициирует связь.
Многие Tornado
примеры тайм-аут, который не будет работать в моем случае. Все примеры websockets
и asyncio
представляют собой либо клиент-сервер, и / или имеют строку, подобную somethingOrOther.run()
, которая блокирует выполнение на клиенте или сервере, или и то, и другое. Кроме того, во многих примерах один клиент или сервер написан на Python, а другой - на JavaScript (мне нужны оба в Python).
мне не нужен соединение должно быть полнодуплексным, то есть клиент никогда не должен отправлять серверу на тот же сокет, за исключением, возможно, отправки подтверждения.
Этот вопрос кажется чтобы быть похожим, однако, код автора не является полным, нет принятого ответа, и один предоставленный ответ (не принятый) не содержит полный код, и клиент находится в JavaScript (мне нужны оба в Python ).
Вот пример того, что я пытаюсь сделать, используя обычные сокеты:
сервер:
# server.py
import socket
import time
import random
# module-level variables
HOST='127.0.0.1'
PORT=65439
ACK_TEXT = 'ack_text'
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((HOST, PORT))
sock.listen()
conn, addr = sock.accept()
while True:
# In an actual program some significant processing/logic would happen here that would take time
# and also produce a certain result, depending on which we may or may not need to send a message.
# To simulate this, we'll pause for a short and random amount of time, then get a random integer.
time.sleep(random.uniform(0.5, 1.0))
myRandInt = random.randint(0, 10)
# if the ranom integer is even, send a message
if myRandInt % 2 == 0:
message = str(myRandInt)
print('sending message: ' + message)
sendTextViaSocket(conn, message)
# end if
# end while
# end function
def sendTextViaSocket(conn, message):
# encode the message
encodedMessage = bytes(message, 'utf-8')
# send the encoded message
conn.sendall(encodedMessage)
# receive the encoded acknowledgement
encodedAck = conn.recv(1024)
# decode the acknowledgement
ack = encodedAck.decode('utf-8')
# check the received acknowledgement is correct, if not log an error
if ack == ACK_TEXT:
pass
else:
print('error: acknowledgement was received as ' + str(ack))
# end if
# end function
if __name__ == '__main__':
main()
клиент:
# client
import socket
import select
import time
import random
# module-level variables
HOST='127.0.0.1'
PORT=65439
ACK_TEXT = 'ack_text'
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# if the client is started 1st connect will crash, so continually try to connect
# in a try-catch until the server is available
connectionSuccessful = False
while not connectionSuccessful:
try:
sock.connect((HOST, PORT))
connectionSuccessful = True
except:
pass
# end try
# end while
socks = [ sock ]
# endlessly receive messages
while True:
readySocks, _, _ = select.select(socks, [], [], 0)
for readySock in readySocks:
message = receiveTextViaSocket(readySock)
print('received message = ' + str(message))
# end for
# In an actual program client would have other significant tasks to do
# To simulate this we'll pause for a short but random amount of time
time.sleep(random.uniform(0.5, 1.0))
# end while
# end function
def receiveTextViaSocket(sock):
# receive the encoded message
encodedMessage = sock.recv(1024)
# decode the message
message = encodedMessage.decode('utf-8')
# now send the acknowledgement
# encode the acknowledgement
encodedAck = bytes(ACK_TEXT, 'utf-8')
# send the encoded acknowledgement
sock.sendall(encodedAck)
return message
# end function
if __name__ == '__main__':
main()
Как я могу сделать то же самое, но с WebSockets ??
--- Edit ---
Потратив большую часть сегодняшнего дня на это, это лучшее, что я сделал, так далеко:
сервер:
# server.py
import threading
import asyncio
import websockets
import collections
import random
import time
sendMessageQueue = collections.deque()
def main():
webSocketServer = websockets.serve(sendMessages, 'localhost', 8765)
myLoop = asyncio.get_event_loop()
webSockThread = threading.Thread(target=webSockStart, args=(webSocketServer, myLoop,))
webSockThread.start()
while True:
myRandInt = random.randint(1, 10)
print('appending ' + str(myRandInt))
sendMessageQueue.append(str(myRandInt))
print('sendMessageQueue = ' + str(sendMessageQueue))
time.sleep(random.uniform(1.0, 2.0))
# end while
# end function
def webSockStart(webSocketServer, myLoop):
myLoop.run_until_complete(webSocketServer)
myLoop.run_forever()
# end function
async def sendMessages(websocket, path):
while len(sendMessageQueue) > 0:
await websocket.send(sendMessageQueue.popleft())
# end while
# end function
if __name__ == '__main__':
main()
клиент:
# client.py
import threading
import asyncio
import websockets
import collections
import random
import time
receiveMessageQueue = collections.deque()
def main():
receiveWebSockThread = threading.Thread(target=receiveWebSockStart)
receiveWebSockThread.start()
while True:
print('doing other stuff')
time.sleep(1.0)
while len(receiveMessageQueue) > 0:
message = receiveMessageQueue.popleft()
print('message = ' + str(message))
# end while
# end while
# end function
def receiveWebSockStart():
loop = asyncio.new_event_loop()
loop.run_until_complete(receiveMessages())
loop.run_forever()
# end function
async def receiveMessages():
while True:
uri = 'ws://localhost:8765'
async with websockets.connect(uri) as webSockConn:
message = await webSockConn.recv()
receiveMessageQueue.append(str(message))
# while True:
# end while
# end with
# end function
if __name__ == '__main__':
main()
выход сервера: * 10 43 *
$ python3 server.py
appending 3
sendMessageQueue = deque(['3'])
appending 8
sendMessageQueue = deque(['8'])
appending 8
sendMessageQueue = deque(['8', '8'])
appending 6
sendMessageQueue = deque(['8', '8', '6'])
appending 1
sendMessageQueue = deque(['8', '8', '6', '1'])
вывод клиента:
$ python3 client.py
doing other stuff
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "client.py", line 30, in receiveWebSockStart
loop.run_until_complete(receiveMessages())
File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "client.py", line 38, in receiveMessages
message = await webSockConn.recv()
File "/usr/local/lib/python3.6/dist-packages/websockets/protocol.py", line 509, in recv
await self.ensure_open()
File "/usr/local/lib/python3.6/dist-packages/websockets/protocol.py", line 812, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason
message = 3
doing other stuff
doing other stuff
doing other stuff
doing other stuff
Сервер, кажется, работает должным образом, однако клиент отключается после получения одного сообщения, и я не уверен, почему или как его решить , Я не уверен, что потоки - даже правильный подход, или я должен был использовать run_in_executor
от concurrent.futures
, но я не мог заставить это работать вообще. Любой, у кого есть определенные c знания по этой теме c, пожалуйста, помогите.
--- Edit2 ---
Ну, по крайней мере, я нашел ответ, который работает (проверено с Python 3.6.9 в Ubuntu 18.04), но я действительно недоволен этим:
сервер:
# server.py
import threading
import asyncio
import websockets
import collections
import random
import time
sendMessageQueue = collections.deque()
def main():
# start the WebSocket sending on a separate thread so it doesn't block main
webSockSendThread = threading.Thread(target=sendWebSockStart)
webSockSendThread.start()
while True:
# make up a random integer and append it to the send queue
myRandInt = random.randint(1, 10)
print('appending ' + str(myRandInt))
sendMessageQueue.append(str(myRandInt))
# an actual program would have many other activities to do here, use a random sleep to simulate this
print('doing other stuff')
time.sleep(random.uniform(1.0, 2.0))
# end while
# end function
def sendWebSockStart():
# since we're in a separate thread now, call new_event_loop() (rather than the usual get_event_loop())
# and set the returned loop as the current loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# instantiate the WebSocket server, note this also connects it to the sendMessages function
webSocketServer = websockets.serve(sendMessages, 'localhost', 8765)
# run the webSocketServer forever, which runs the sendMessages function forever
loop.run_until_complete(webSocketServer)
loop.run_forever() # note execution of this separate thread stays on this line forever
# end function
async def sendMessages(websocket, path):
while True:
while len(sendMessageQueue) > 0:
await websocket.send(sendMessageQueue.popleft())
# end while
# end while
# end function
if __name__ == '__main__':
main()
клиент:
# client.py
import threading
import asyncio
import websockets
import collections
import random
import time
receiveMessageQueue = collections.deque()
def main():
# start the WebSocket receiving on a separate thread so it doesn't block main
receiveWebSockThread = threading.Thread(target=receiveWebSockStart)
receiveWebSockThread.start()
while True:
# dequeue and print out all the messages in the receive queue currently
while len(receiveMessageQueue) > 0:
message = receiveMessageQueue.popleft()
print('message = ' + str(message))
# end while
# an actual program would have many other activities to do here, use a sleep to simulate this
print('doing other stuff')
time.sleep(1.0)
# end while
# end function
def receiveWebSockStart():
# since we're in a separate thread now, call new_event_loop() rather than the usual get_event_loop()
loop = asyncio.new_event_loop()
# run receiveMessages() forever
loop.run_until_complete(receiveMessages())
loop.run_forever() # note execution of this separate thread stays on this line forever
# end function
async def receiveMessages():
# set up the connection
uri = 'ws://localhost:8765'
async with websockets.connect(uri) as webSockConn:
# endlessly receive messages and append to the queue when received
while True:
message = await webSockConn.recv()
receiveMessageQueue.append(str(message))
# end while
# end with
# end function
if __name__ == '__main__':
main()
вывод сервера:
$ python3 server.py
appending 10
doing other stuff
appending 6
doing other stuff
appending 2
doing other stuff
appending 8
doing other stuff
appending 2
doing other stuff
appending 3
doing other stuff
appending 9
doing other stuff
вывод клиента:
$ python3 client.py
doing other stuff
message = 10
message = 6
doing other stuff
doing other stuff
message = 2
doing other stuff
doing other stuff
message = 8
doing other stuff
doing other stuff
message = 2
doing other stuff
doing other stuff
message = 3
doing other stuff
message = 9
doing other stuff
Я использую threading
в качестве посредника между main
и asyncio
websockets
функцией. Кто-нибудь может объяснить или еще лучше предоставить рабочий пример того, как переработать это, чтобы использовать run_in_executor
для устранения промежуточного потока ??