Отправка данных в асин c класс в python - PullRequest
0 голосов
/ 03 апреля 2020

У меня есть внутренний сервер, который обрабатывает некоторые входящие последовательные данные и запускает сервер веб-сокетов в отдельном классе. Мой вопрос в том, что является хорошим способом обмена данными между основным потоком, а именно, моим Runner и моим Server. Что я сделал для входящих сообщений, так это то, что я передаю подпрограмму обратного вызова на сервер при создании

self.server = threading.Thread(target=Server(self.server_callback), args=(1,))

Таким образом, сервер вызывает соответствующий метод обратного вызова основного потока, но я не знаю, как я могу говорить с бегун на сервер, например, чтобы отправить что-то.

мой runner.py

import logging
import logging.handlers
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(name)-5s] [%(levelname)-3s] %(message)s'
)
import time
from typing import Dict, Pattern, Match, Tuple, List
import yaml
import threading
from server import Server
from utility import Session



# ---------------------------------------------------------------------
class Runner:
    server: Server

    # ---------------------------------------------------------------------
    def __init__(self):
        self.logger = logging.getLogger('Runner')
        Path("logs").mkdir(parents=True, exist_ok=True)
        self.handler = logging.handlers.RotatingFileHandler(
            "logs/out.log", maxBytes=1024 * 1000, backupCount=7
        )
        formatter = logging.Formatter("%(asctime)s [%(name)-5s] [%(levelname)-3s] %(message)s")
        self.handler.setFormatter(formatter)
        self.logger.addHandler(self.handler)

        self.server = threading.Thread(target=Server(self.server_callback), args=(1,))

    def server_callback(self, message):
        self.logger.info("hello from runner, {}".format(message))

и server.py

import asyncio
import json
import logging
import websockets
from typing import Dict, Set


class Server:

    state: Dict = {'value': 0}
    users: Set = set()

    def __init__(self, callback):
        self.logger = logging.getLogger(Server.__name__)
        self.server = websockets.serve(self.counter, "localhost", 5000)
        self.callback = callback
        self.logger.info("Start server")
        asyncio.get_event_loop().run_until_complete(self.server)
        asyncio.get_event_loop().run_forever()

    def test(self):
        self.logger.info("test")

    def state_event(self):
        self.logger.info("state_event")
        return json.dumps({'type': "state", **self.state})

    def user_event(self):
        self.logger.info("user_event")
        return json.dumps({'type': "users", 'count': len(self.users)})

    async def notify_state(self):
        if self.users:
            message = self.state_event()
            await asyncio.wait([user.send(message) for user in self.users])

    async def notify_users(self):
        if self.users:
            message = self.user_event()
            await asyncio.wait([user.send(message) for user in self.users])

    async def register(self, ws):
        self.users.add(ws)
        await self.notify_users()

    async def unregister(self, ws):
        self.users.remove(ws)
        await self.notify_users()

    async def counter(self, ws, path):
        await self.register(ws)
        try:
            await ws.send(self.state_event())
            async for message in ws:
                data = json.loads(message)
                if data['action'] == 'plus':
                    self.state['value'] += 1
                    await self.notify_state()
                else:
                    self.logger.error("unsupported event: {}".format(data))
                    self.callback(data)
        finally:
            await self.unregister(ws)

Я подумал, что мог бы просто позвонить любой серверный метод напрямую от бегуна, но так как это отдельный поток, он не работает. Является ли queue правильным подходом, и если да, для двунаправленной связи, или я должен сохранить подход обратного вызова для входящих сообщений? Я хотел бы иметь возможность вызывать notify_users из своего бэкэнда, а не просто вызывать его как реакцию на входящее сообщение.

Я также попытался запустить функцию отправки в отдельном потоке, который вызвал бы self.notify_users через фиксированный интервал, но это выдает RuntimeWarning: coroutine 'Server.dispatcher' was never awaited.

Обновление:
Так вот мое рабочее решение, но я не уверен, что это правильный путь

def __init__(self, callback, queue):
    self.logger = logging.getLogger(Server.__name__)
    self.server = websockets.serve(self.counter, "localhost", 5000)
    self.callback = callback
    self.queue: Queue = queue
    self.logger.info("Start server")
    dispatcher = threading.Thread(target=self.dispatcher)
    dispatcher.start()
    asyncio.get_event_loop().run_until_complete(self.server)
    asyncio.get_event_loop().run_forever()

Иметь очередь, которая используется совместно с вызывающей стороной, и создать отдельный поток dispatcher, который будет обрабатывать элементы из очереди, а затем вызывать соответствующий метод отправки как async loop

def dispatcher(self):
    while True:
        if not self.queue.empty():
            self.logger.info("dispatch")
            item = self.queue.get()
            asyncio.run(self.notify_users())
        time.sleep(0.5)

Входящие сообщения по-прежнему обрабатываются с использованием функций обратного вызова. Он работает, но не уверен, что это самый простой способ реализовать это, но, скорее всего, он относится к плате Code Review.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...