У меня есть веб-сокет, который я слушаю, который обеспечивает постоянный поток данных. Мне удалось успешно настроить слушателя, чтобы подписаться на эти данные и записать их в консоль, однако я также хочу вернуть эти данные, чтобы я мог отслеживать их, поскольку данные создаются, используя другой метод для завершения процесс, если значение превышено.
В настоящее время я не могу получить доступ к данным с помощью asyncio, я попытался добавить дополнительный для l oop, чтобы вернуть данные вместе с игрой с asyncio.gather () однако, независимо от того, какую конфигурацию я установил, я не могу заставить это возвращать какие-либо данные, где я ошибаюсь? См. Код ниже:
def log_message(self, message: List) -> None:
self.logger.info(f"Data: {message}")
@staticmethod
def filter_message(message) -> List:
msg = dict(json.loads(message))
msg_data = []
if "data" in msg.keys():
for i in msg["data"]:
if len(i[-1]) > 1:
msg_data.append({i[-1]['message_name']: i[-1]['signals']})
return msg_data
async def subscription_handler(self, websocket: websockets.client.WebSocketClientProtocol) -> None:
async for message in websocket:
msg_data = self.filter_message(message)
self.log_message(msg_data)
async def subscriber(self, subscription_payload):
payload = json.dumps(subscription_payload)
async with websockets.connect(self.ws_url) as ws:
await ws.send(payload)
await self.subscription_handler(ws)
async for message in ws:
msg_data = self.filter_message(message)
return msg_data
def run_subscriber(self, payload: dict) -> List:
sub_loop = asyncio.get_event_loop()
data = sub_loop.run_until_complete(asyncio.gather(self.subscriber(payload)))
return data