У меня есть внутренний сервер, который обрабатывает некоторые входящие последовательные данные и запускает сервер веб-сокетов в отдельном классе. Мой вопрос в том, что является хорошим способом обмена данными между основным потоком, а именно, моим Runner
и моим Server
. Что я сделал для входящих сообщений, так это то, что я передаю подпрограмму обратного вызова на сервер при создании
self.server = threading.Thread(target=Server(self.server_callback), args=(1,))
Таким образом, сервер вызывает соответствующий метод обратного вызова основного потока, но я не знаю, как я могу говорить с бегун на сервер, например, чтобы отправить что-то.
мой runner.py
import logging
import logging.handlers
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(name)-5s] [%(levelname)-3s] %(message)s'
)
import time
from typing import Dict, Pattern, Match, Tuple, List
import yaml
import threading
from server import Server
from utility import Session
# ---------------------------------------------------------------------
class Runner:
server: Server
# ---------------------------------------------------------------------
def __init__(self):
self.logger = logging.getLogger('Runner')
Path("logs").mkdir(parents=True, exist_ok=True)
self.handler = logging.handlers.RotatingFileHandler(
"logs/out.log", maxBytes=1024 * 1000, backupCount=7
)
formatter = logging.Formatter("%(asctime)s [%(name)-5s] [%(levelname)-3s] %(message)s")
self.handler.setFormatter(formatter)
self.logger.addHandler(self.handler)
self.server = threading.Thread(target=Server(self.server_callback), args=(1,))
def server_callback(self, message):
self.logger.info("hello from runner, {}".format(message))
и server.py
import asyncio
import json
import logging
import websockets
from typing import Dict, Set
class Server:
state: Dict = {'value': 0}
users: Set = set()
def __init__(self, callback):
self.logger = logging.getLogger(Server.__name__)
self.server = websockets.serve(self.counter, "localhost", 5000)
self.callback = callback
self.logger.info("Start server")
asyncio.get_event_loop().run_until_complete(self.server)
asyncio.get_event_loop().run_forever()
def test(self):
self.logger.info("test")
def state_event(self):
self.logger.info("state_event")
return json.dumps({'type': "state", **self.state})
def user_event(self):
self.logger.info("user_event")
return json.dumps({'type': "users", 'count': len(self.users)})
async def notify_state(self):
if self.users:
message = self.state_event()
await asyncio.wait([user.send(message) for user in self.users])
async def notify_users(self):
if self.users:
message = self.user_event()
await asyncio.wait([user.send(message) for user in self.users])
async def register(self, ws):
self.users.add(ws)
await self.notify_users()
async def unregister(self, ws):
self.users.remove(ws)
await self.notify_users()
async def counter(self, ws, path):
await self.register(ws)
try:
await ws.send(self.state_event())
async for message in ws:
data = json.loads(message)
if data['action'] == 'plus':
self.state['value'] += 1
await self.notify_state()
else:
self.logger.error("unsupported event: {}".format(data))
self.callback(data)
finally:
await self.unregister(ws)
Я подумал, что мог бы просто позвонить любой серверный метод напрямую от бегуна, но так как это отдельный поток, он не работает. Является ли queue
правильным подходом, и если да, для двунаправленной связи, или я должен сохранить подход обратного вызова для входящих сообщений? Я хотел бы иметь возможность вызывать notify_users
из своего бэкэнда, а не просто вызывать его как реакцию на входящее сообщение.
Я также попытался запустить функцию отправки в отдельном потоке, который вызвал бы self.notify_users
через фиксированный интервал, но это выдает RuntimeWarning: coroutine 'Server.dispatcher' was never awaited
.
Обновление:
Так вот мое рабочее решение, но я не уверен, что это правильный путь
def __init__(self, callback, queue):
self.logger = logging.getLogger(Server.__name__)
self.server = websockets.serve(self.counter, "localhost", 5000)
self.callback = callback
self.queue: Queue = queue
self.logger.info("Start server")
dispatcher = threading.Thread(target=self.dispatcher)
dispatcher.start()
asyncio.get_event_loop().run_until_complete(self.server)
asyncio.get_event_loop().run_forever()
Иметь очередь, которая используется совместно с вызывающей стороной, и создать отдельный поток dispatcher
, который будет обрабатывать элементы из очереди, а затем вызывать соответствующий метод отправки как async loop
def dispatcher(self):
while True:
if not self.queue.empty():
self.logger.info("dispatch")
item = self.queue.get()
asyncio.run(self.notify_users())
time.sleep(0.5)
Входящие сообщения по-прежнему обрабатываются с использованием функций обратного вызова. Он работает, но не уверен, что это самый простой способ реализовать это, но, скорее всего, он относится к плате Code Review
.