Python: веб-сокеты в синхронной программе - PullRequest
0 голосов
/ 05 июля 2018

У меня есть стандартная синхронная программа на Python, которая должна иметь возможность считывать данные из веб-сокетов и обновлять графический интерфейс с данными. Тем не менее, асинцио постоянно меня сбивает с толку.

Как мне сделать модуль, который:

  1. принимает несколько подписок на несколько источников
  2. отправляет обновление запрашивающей стороне, когда есть данные
  3. открывает ровно одно подключение к веб-сокету на URL
  4. сбрасывает веб-сокет, если он закрывается

Вот то, что у меня уже есть, но во многих случаях оно терпит неудачу:

  1. run_forever() означает, что цикл застревает до завершения подписки, а затем handle() застревает в цикле Falsey while
  2. похоже, что он не хочет перезапускать сокеты, когда они выключены, потому что объект websockets не имеет свойства connected (websocket без s имеет, но я не понимаю различий и не могу найти информация онлайн либо)
  3. Я абсолютно не уверен, правильно ли подходит мой подход.

Бились с этим неделями. Был бы признателен за некоторые указатели.

class WSClient():
    subscriptions = set()
    connections = {}
    started = False

    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def start(self):
        self.started = True
        self.loop.run_until_complete(self.handle())
        self.loop.run_until_forever()  # problematic, because it does not allow new subscribe() events

    async def handle(self):
        while len(self.connections) > 0:
            # listen to every websocket
            futures = [self.listen(self.connections[url]) for url in self.connections]
            done, pending = await asyncio.wait(futures)

            # the following is apparently necessary to avoid warnings
            # about non-retrieved exceptions etc
            try:
                data, ws = done.pop().result()
            except Exception as e:
                print("OTHER EXCEPTION", e)

            for task in pending:
                task.cancel()

    async def listen(self, ws):
        try:
            async for data in ws:
                data = json.loads(data)
                # call the subscriber (listener) back when there's data
                [s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
        except Exception as e:
            print('ERROR LISTENING; RESTARTING SOCKET', e)
            await asyncio.sleep(2)
            self.restart_socket(ws)

    def subscribe(self, subscription):
        task = self.loop.create_task(self._subscribe(subscription))
        asyncio.gather(task)

        if not self.started:
            self.start()

    async def _subscribe(self, subscription):
        try:
            ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
            await ws.send(json.dumps(subscription.sub_msg))

            subscription.ws = ws
            self.connections[subscription.url] = ws
            self.subscriptions.add(subscription)
        except Exception as e:
            print("ERROR SUBSCRIBING; RETRYING", e)
            await asyncio.sleep(2)
            self.subscribe(subscription)

    def restart_socket(self, ws):
        for s in self.subscriptions:
            if s.ws == ws and not s.ws.connected:
                print(s)
                del self.connections[s.url]
                self.subscribe(s)

Ответы [ 2 ]

0 голосов
/ 21 июля 2018

Вы можете попробовать торнадо и скрученный автобан для веб-розеток.

0 голосов
/ 19 июля 2018

У меня есть стандартная синхронная программа на Python, которая должна иметь возможность считывать данные из веб-сокетов и обновлять графический интерфейс с данными. Тем не менее, асинцио постоянно меня сбивает с толку.

Как вы упомянули GUI, тогда это, вероятно, не "стандартная синхронная программа на Python". Обычно программа с графическим интерфейсом пользователя имеет неблокирующий управляемый событиями основной поток, который допускает одновременное поведение пользователей и обратные вызовы. Это очень похоже на asyncio, и обычно asyncio обычно работает вместе с GUI для использования специфичного для GUI цикла событий для замены цикла событий по умолчанию в asyncio , так что ваши сопрограммы asyncio просто запускаются в цикле событий GUI, и вы можете избежать вызова run_forever(), блокирующего все.

Альтернативный способ - запустить цикл событий asyncio в отдельном потоке, чтобы ваша программа могла одновременно ожидать данные веб-сокета и ждать щелчков пользователя. Я переписал ваш код следующим образом:

import asyncio
import threading
import websockets
import json


class WSClient(threading.Thread):

    def __init__(self):
        super().__init__()
        self._loop = None
        self._tasks = {}
        self._stop_event = None

    def run(self):
        self._loop = asyncio.new_event_loop()
        self._stop_event = asyncio.Event(loop=self._loop)
        try:
            self._loop.run_until_complete(self._stop_event.wait())
            self._loop.run_until_complete(self._clean())
        finally:
            self._loop.close()

    def stop(self):
        self._loop.call_soon_threadsafe(self._stop_event.set)

    def subscribe(self, url, sub_msg, callback):
        def _subscribe():
            if url not in self._tasks:
                task = self._loop.create_task(
                    self._listen(url, sub_msg, callback))
                self._tasks[url] = task

        self._loop.call_soon_threadsafe(_subscribe)

    def unsubscribe(self, url):
        def _unsubscribe():
            task = self._tasks.pop(url, None)
            if task is not None:
                task.cancel()

        self._loop.call_soon_threadsafe(_unsubscribe)

    async def _listen(self, url, sub_msg, callback):
        try:
            while not self._stop_event.is_set():
                try:
                    ws = await websockets.connect(url, loop=self._loop)
                    await ws.send(json.dumps(sub_msg))
                    async for data in ws:
                        data = json.loads(data)

                        # NOTE: please make sure that `callback` won't block,
                        # and it is allowed to update GUI from threads.
                        # If not, you'll need to find a way to call it from
                        # main/GUI thread (similar to `call_soon_threadsafe`)
                        callback(data)
                except Exception as e:
                    print('ERROR; RESTARTING SOCKET IN 2 SECONDS', e)
                    await asyncio.sleep(2, loop=self._loop)
        finally:
            self._tasks.pop(url, None)

    async def _clean(self):
        for task in self._tasks.values():
            task.cancel()
        await asyncio.gather(*self._tasks.values(), loop=self._loop)
...