Sanic Webserver: обработчик Websocket закрывает сокет при возврате; зацикливание прерывает другие обработчики запросов - PullRequest
2 голосов
/ 18 июня 2019

Сценарий: у меня есть здравый веб-сервер, обслуживающий простой веб-сайт. Сайт представляет собой большую таблицу данных в формате html с поддержкой шаблонов vue. Поскольку записи в таблице меняются каждые несколько минут, данные передаются через websocket при изменении. Около 2000 пользователей одновременно. Я попытался реализовать архитектуру pub / sub.

Проблема: мои веб-сокеты закрываются, как только мой обработчик sanic возвращается. Я мог бы иметь петлю внутри, чтобы держать обработчик открытым. Но держать 2000 открытых обработчиков звучит как плохая идея ... Также открытые обработчики ведут себя странно. Один поток или небольшой пул потоков должен делать эту работу. Возможно, я неправильно понял санитарную документацию и нуждаюсь в совете по дизайну.

Вещи, которые я пробовал: - увеличить время ожидания, чтобы оно было достаточно высоким - попробовать различные другие настройки веб-сокета в Sanic - позвольте моей стороне клиента js вернуть ложное сообщение ( Закрытие Javascript сразу после открытия ) - установить для ссылки ws значение null после ее передачи

Индекс Sanic Webserver:

@app.route('/')
async def serve_index(request):
    return await file(os.path.join(os.path.dirname(__file__), 'index.html'))

JS Index.html:

var app = new Vue({
    el: '#app',
        data() {
            manydata0: 0,
            manydata1: 0,
            ws: null,
        }
    },
    methods: {
        update: function (json_data) {
            json = JSON.parse(json_data);
            this.manydata0 = json['data0'];
            this.manydata1 = json['data1'];
        }
    },
    created: function () {
        this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
        messages = document.createElement('ul');
        this.ws.onmessage = function (event) {
            console.log("new data")
            app.update(event.data);
        return false;
    };
    document.body.appendChild(messages);
    this.ws.onclose = function (event) {
        console.log("closed :(")
    };

Обработчик веб-сокета Sanic Webserver (1-я версия, сокеты сразу умирают):

@app.websocket('/reload')
async def feed(request, ws):
    #time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
    await ws.send(Path(json).read_text()) # serve initial data
    connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates

Обработчик веб-сокетов Sanic Webservers (2-я версия, Обработчик блокирует другие обработчики запросов)

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
        except Exception as e:
            print("Exception while checking file: ", e)
    # this stops the server to handle other @app.routes like css, fonts, favicon

Обработчик веб-сокетов Sanic Webservers (3-я версия, ненужный recv ())

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
                await recv() # if the client sends from time to time all is fine
        except Exception as e:
            print("Exception while checking file: ", e)

Последние два фрагмента кода мало чем отличаются. Я добавляю ws.recv () и отправляю подходящие вещи со стороны клиента (например, с интервалом), тогда все работает. Затем отправляются css, шрифты и favicon. Но это не может быть предназначено, не так ли? Это не должно хорошо масштабироваться, верно?

В целом, это не имеет особого смысла для меня. Что я недопонимаю?

1 Ответ

0 голосов
/ 19 июня 2019

один из Sanic core-devs здесь.

Во-первых, для примера архитектуры типа pubsub, вот суть , которую я подготовил.Я думаю, что это могло бы помочь.

Моя основная идея - создать один Feed объект, который зацикливается на своей собственной задаче, ища событие.В моем случае это получение информации от pubsub.В вашем случае следует проверить время в документе JSON.

Затем, когда для этого Feed.receiver сработало событие, оно отправляет запрос всем прослушивающим клиентам.

Внутри самого обработчика websocket вы хотите оставить его открытым.Если вы этого не сделаете, то соединение будет закрыто.Если вы не заботитесь о получении информации от клиента, вам не нужно использовать await recv().


Так что, в вашем случае, используя SUPER простую логику, я бывыполните что-то вроде следующего.

Это непроверенный код, может потребоваться некоторая настройка

import os
import random
import string
from functools import partial
from pathlib import Path

from sanic import Sanic

import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set

app = Sanic(__name__)

FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20


def generate_code(length=12, include_punctuation=False):
    characters = string.ascii_letters + string.digits
    if include_punctuation:
        characters += string.punctuation
    return "".join(random.choice(characters) for x in range(length))


@dataclass
class Client:
    interface: websockets.server.WebSocketServerProtocol = field(repr=False)
    sid: str = field(default_factory=partial(generate_code, 36))

    def __hash__(self):
        return hash(str(self))

    async def keep_alive(self) -> None:
        while True:
            try:
                try:
                    pong_waiter = await self.interface.ping()
                    await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
                except asyncio.TimeoutError:
                    print("NO PONG!!")
                    await self.feed.unregister(self)
                else:
                    print(f"ping: {self.sid} on <{self.feed.name}>")
                await asyncio.sleep(INTERVAL)
            except websockets.exceptions.ConnectionClosed:
                print(f"broken connection: {self.sid} on <{self.feed.name}>")
                await self.feed.unregister(self)
                break

    async def shutdown(self) -> None:
        self.interface.close()

    async def run(self) -> None:
        try:
            self.feed.app.add_task(self.keep_alive())
            while True:
                pass
        except websockets.exceptions.ConnectionClosed:
            print("connection closed")
        finally:
            await self.feed.unregister(self)


class Feed:
    app: Sanic
    clients: Set[Client]
    cached = None

    def __init__(self, app: Sanic):
        self.clients = set()
        self.app = app

    @classmethod
    async def get(cls, app: Sanic):
        is_existing = False

        if cls.cached:
            is_existing = True
            feed = cls.cached
        else:
            feed = cls(app)
            cls.cached = feed

        if not is_existing:
            feed.app.add_task(feed.receiver())

        return feed, is_existing

    async def receiver(self) -> None:
        print("Feed receiver started")
        mod_time = 0
        while True:
            try:
                stat = os.stat(FILE)
                print(f"times: {mod_time} | {stat.st_mtime}")
                if mod_time != stat.st_mtime:
                    content = self.get_file_contents()
                    for client in self.clients:
                        try:
                            print(f"\tSending to {client.sid}")
                            await client.interface.send(content)
                        except websockets.exceptions.ConnectionClosed:
                            print(f"ConnectionClosed. Client {client.sid}")
            except Exception as e:
                print("Exception while checking file: ", e)

    async def register(
        self, websocket: websockets.server.WebSocketServerProtocol
    ) -> Optional[Client]:
        client = Client(interface=websocket)
        print(f">>> register {client}")

        client.feed = self
        self.clients.add(client)

        # Send initial content
        content = self.get_file_contents()
        client.interface.send(content)

        print(f"\nAll clients\n{self.clients}\n\n")

        return client

    async def unregister(self, client: Client) -> None:
        print(f">>> unregister {client} on <{self.name}>")
        if client in self.clients:
            await client.shutdown()
            self.clients.remove(client)
            print(f"\nAll remaining clients\n{self.clients}\n\n")

    def get_file_contents(self):
        return Path(FILE).read_text()


@app.websocket("/reload")
async def feed(request, ws):
    feed, is_existing = await Feed.get(app)

    client = await feed.register(ws)
    await client.run()


if __name__ == "__main__":
    app.run(debug=True, port=7777)
...