Я пишу для Python> 3,5.
Я использую библиотеку Websockets 6.0, которая находится здесь:
https://github.com/aaugustin/websockets
Я называю их asyncio
веб-сокетами, так как они основаны на asyncio
В моем поиске было много "потерянных соединений", но я смотрю, как отменить текущий ws.recv()
.
Вызов .start()
создает вспомогательный поток для запуска цикла событий asynico. Затем функция приема запускается и вызывает функцию соединения, и создается экземпляр websocket ws
. Тогда функции приема работают, падают сообщения. Когда я готов остановиться, вызывается .stop (). Я ожидал, что функция останова остановит ожидаемое ws.recv()
. Затем, установив флаг keep_running в значение false и запустив ws.close()
, я ожидал, что ws.recv()
завершится, а цикл when keep_running закончится. Это не то, что происходит. Я вижу все три остановки, но никогда receive stop
.
command is: stop
Do command is stopped
Stop 1
Stop 2
Stop 3
^CException ignored in: <module 'threading' from '/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py'>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1294, in _shutdown
t.join()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
(pyalmondplus) Pauls-MBP:pyalmondplus paulenright$
Код для справки:
import threading
import asyncio
import websockets
import json
class PyAlmondPlus:
def __init__(self, api_url, event_callback=None):
self.api_url = api_url
self.ws = None
self.loop = asyncio.get_event_loop()
self.receive_task = None
self.event_callback = event_callback
self.keep_running = False
async def connect(self):
print("connecting")
if self.ws is None:
print("opening socket")
self.ws = await websockets.connect(self.api_url)
print(self.ws)
async def disconnect(self):
pass
async def send(self, message):
pass
async def receive(self):
print("receive started")
while self.keep_running:
if self.ws is None:
await self.connect()
recv_data = await self.ws.recv()
print(recv_data)
print("receive ended")
def start(self):
self.keep_running = True
print("Start 1")
print("Start 2")
t = threading.Thread(target=self.start_loop, args=())
print("Start 3")
t.start()
print("Receiver running")
def start_loop(self):
print("Loop helper 1")
policy = asyncio.get_event_loop_policy()
policy.set_event_loop(policy.new_event_loop())
self.loop = asyncio.get_event_loop()
self.loop.set_debug(True)
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.receive())
print("Loop helper 2")
def stop(self):
print("Stop 1")
self.keep_running = False
print("Stop 2")
self.ws.close()
print("Stop 3")