Получение возвращенных данных с помощью веб-сокетов и AsyncIO - PullRequest
0 голосов
/ 18 июня 2020

У меня есть веб-сокет, который я слушаю, который обеспечивает постоянный поток данных. Мне удалось успешно настроить слушателя, чтобы подписаться на эти данные и записать их в консоль, однако я также хочу вернуть эти данные, чтобы я мог отслеживать их, поскольку данные создаются, используя другой метод для завершения процесс, если значение превышено.

В настоящее время я не могу получить доступ к данным с помощью asyncio, я попытался добавить дополнительный для l oop, чтобы вернуть данные вместе с игрой с asyncio.gather () однако, независимо от того, какую конфигурацию я установил, я не могу заставить это возвращать какие-либо данные, где я ошибаюсь? См. Код ниже:

    def log_message(self, message: List) -> None:
        self.logger.info(f"Data: {message}")

    @staticmethod
    def filter_message(message) -> List:
        msg = dict(json.loads(message))
        msg_data = []
        if "data" in msg.keys():
            for i in msg["data"]:
                if len(i[-1]) > 1:
                    msg_data.append({i[-1]['message_name']: i[-1]['signals']})
        return msg_data

    async def subscription_handler(self, websocket: websockets.client.WebSocketClientProtocol) -> None:
        async for message in websocket:
            msg_data = self.filter_message(message)
            self.log_message(msg_data)

    async def subscriber(self, subscription_payload):
        payload = json.dumps(subscription_payload)
        async with websockets.connect(self.ws_url) as ws:
            await ws.send(payload)
            await self.subscription_handler(ws)
            async for message in ws:
                msg_data = self.filter_message(message)
                return msg_data

    def run_subscriber(self, payload: dict) -> List:
        sub_loop = asyncio.get_event_loop()
        data = sub_loop.run_until_complete(asyncio.gather(self.subscriber(payload)))
        return data
...