Веб-сокеты Python (asyncio ver) принудительно закрыть соединение - PullRequest
0 голосов
/ 29 июня 2018

Я пишу для Python> 3,5. Я использую библиотеку Websockets 6.0, которая находится здесь: https://github.com/aaugustin/websockets Я называю их asyncio веб-сокетами, так как они основаны на asyncio В моем поиске было много "потерянных соединений", но я смотрю, как отменить текущий ws.recv(). Вызов .start() создает вспомогательный поток для запуска цикла событий asynico. Затем функция приема запускается и вызывает функцию соединения, и создается экземпляр websocket ws. Тогда функции приема работают, падают сообщения. Когда я готов остановиться, вызывается .stop (). Я ожидал, что функция останова остановит ожидаемое ws.recv(). Затем, установив флаг keep_running в значение false и запустив ws.close(), я ожидал, что ws.recv() завершится, а цикл when keep_running закончится. Это не то, что происходит. Я вижу все три остановки, но никогда receive stop.

command is: stop
Do command is stopped
Stop 1
Stop 2
Stop 3
^CException ignored in: <module 'threading' from '/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py'>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1294, in _shutdown
    t.join()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
(pyalmondplus) Pauls-MBP:pyalmondplus paulenright$ 

Код для справки:

import threading
import asyncio
import websockets
import json

class PyAlmondPlus:
    def __init__(self, api_url, event_callback=None):
        self.api_url = api_url
        self.ws = None
        self.loop = asyncio.get_event_loop()
        self.receive_task = None
        self.event_callback = event_callback
        self.keep_running = False

    async def connect(self):
        print("connecting")
        if self.ws is None:
            print("opening socket")
            self.ws = await websockets.connect(self.api_url)
        print(self.ws)

    async def disconnect(self):
        pass

    async def send(self, message):
        pass

    async def receive(self):
        print("receive started")
        while self.keep_running:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()
            print(recv_data)
        print("receive ended")

    def start(self):
        self.keep_running = True
        print("Start 1")
        print("Start 2")
        t = threading.Thread(target=self.start_loop, args=())
        print("Start 3")
        t.start()
        print("Receiver running")

    def start_loop(self):
        print("Loop helper 1")
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(True)
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.receive())
        print("Loop helper 2")

    def stop(self):
        print("Stop 1")
        self.keep_running = False
        print("Stop 2")
        self.ws.close()
        print("Stop 3")

1 Ответ

0 голосов
/ 30 июня 2018

Я смотрю, как отменить текущий ws.recv () [...] Я вижу все три остановки, но никогда не принимаю остановку.

Ваша подпрограмма receive, скорее всего, приостановлена ​​в ожидании поступления некоторых данных, поэтому она не может проверить флаг keep_running.

Простой и надежный способ остановить запущенную сопрограмму - cancel asyncio Task, которая управляет им. Это немедленно отстранит сопрограмму и заставит все, что она ждала, поднять CancelledError. При использовании отмены вам не нужен флаг keep_running, исключение автоматически завершит цикл.

Вызов .start () создает вспомогательный поток для запуска цикла событий asynico.

Это работает, но вам на самом деле не нужен новый поток и совершенно новый цикл событий для каждого экземпляра PyAlmondPlus. Asyncio предназначен для работы внутри одного потока, поэтому один экземпляр цикла событий может содержать любое количество сопрограмм.

Вот возможный дизайн, который реализует обе идеи (не проверенные с настоящими веб-сокетами):

# pre-start a single thread that runs the asyncio event loop
bgloop = asyncio.new_event_loop()
_thread = threading.Thread(target=bgloop.run_forever)
_thread.daemon = True
_thread.start()

class PyAlmondPlus:
    def __init__(self, api_url):
        self.api_url = api_url
        self.ws = None

    async def connect(self):
        if self.ws is None:
            self.ws = await websockets.connect(self.api_url)

    async def receive(self):
        # keep_running is not needed - cancel the task instead
        while True:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()

    async def init_receive_task(self):
        self.receive_task = bgloop.create_task(self.receive())

    def start(self):
        # use run_coroutine_threadsafe to safely submit a coroutine
        # to the event loop running in a different thread
        init_done = asyncio.run_coroutine_threadsafe(
            self.init_receive_task(), bgloop)
        # wait for the init coroutine to actually finish
        init_done.result()

    def stop(self):
        # Cancel the running task. Since the event loop is in a
        # background thread, request cancellation with
        # call_soon_threadsafe.
        bgloop.call_soon_threadsafe(self.receive_task.cancel)
...