Как создать отправленные сервером события для уведомлений об изменении статуса в веб-приложении Python? - PullRequest
2 голосов
/ 25 апреля 2020

У меня есть веб-приложение, написанное на CherryPy: пользователь загружает файл, затем начинается длительная операция, проходящая через несколько этапов. Я хочу, чтобы уведомления об этих этапах передавались всем подключенным клиентам. Но я не знаю, как общаться между процессами. Полагаю, мне пришлось бы запускать длительную операцию в отдельном процессе, но тогда я не знаю, как передать сообщения «продвинутый на этап N» в «функцию отправки на сервер».

Концептуально, это будет примерно так:

SSEtest.py:

from pathlib import Path
from time import sleep
import cherrypy


def lengthy_operation(name, stream):
    for stage in range(10):
        print(f'stage {stage}... ', end='')
        sleep(2)
        print('done')
    print('finished')


class SSETest():

    @cherrypy.expose
    def index(self):
        return Path('SSEtest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'

        def lengthy_operation():
            for stage in range(5):
                yield f'data: stage {stage}... \n\n'
                sleep(2)
                yield 'data: done\n\n'
            yield 'data: finished\n\n'

        return lengthy_operation()

    stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}


cherrypy.quickstart(SSETest())

SSEtest. html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
    <form id="load_file_form" action="" enctype="multipart/form-data">
        <label for="load_file">Load a file: </label>
        <input type="file" id="load_file" name="load_file">
        <progress max="100" value="0" id="progress_bar"></progress>
    </form>
</div>

<div id="status_messages">
<h3>Stages:</h3>
</div>

<script>
    const load_file = document.getElementById('load_file');
    const progress_bar = document.getElementById('progress_bar');

    function update_progress_bar(event) {
        if (event.lengthComputable) {
            progress_bar.value = Math.round((event.loaded/event.total)*100);
        }
    }

    load_file.onchange = function (event) {
        let the_file = load_file.files[0];
        let formData = new FormData();
        let connection = new XMLHttpRequest();

        formData.append('file', the_file, the_file.name);

        connection.open('POST', 'upload', true);
        connection.upload.onprogress = update_progress_bar;
        connection.onload = function (event) {
            if (connection.status != 200) {
                alert('Error! ' + event);
            }
        };

        connection.send(formData);
    };

    const status_messages = document.getElementById("status_messages");
    const sse = new EventSource("stage");

    sse.onopen = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = "Connection established: " + event.type;
        status_messages.appendChild(new_message);
    };

    sse.onmessage = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = event.data;
        status_messages.appendChild(new_message);
    };

    sse.onerror = function(event) {
        let new_message = document.createElement("p");
        if (event.readyState == EventSource.CLOSED) {
            new_message.innerHTML = "Connections closed";
        } else {
            new_message.innerHTML = "Error: " + event.type;
        }
        status_messages.appendChild(new_message);
    };

</script>
</body>
</html>

Мне нужно, чтобы lengthy_operation() вызывался только один раз, когда файл загружен. И сгенерированные им сообщения должны быть отправлены всем клиентам. Теперь он работает с локальной функцией, а это не то, что я хочу. Как я могу использовать внешнюю функцию и передать ее сообщения в метод stage()?

1 Ответ

2 голосов
/ 26 апреля 2020

Я хочу, чтобы уведомления об этих этапах передавались всем подключенным клиентам.

Я подозреваю, что в конце концов вам понадобится больше контроля, но я отвечу на ваш вопрос как это спросили. Позже вы можете захотеть использовать приведенный ниже пример и отфильтровать передаваемые уведомления на основе сеанса пользователя или на основе определенной начальной отметки времени или какой-либо другой соответствующей концепции.

Каждый «подключенный клиент» фактически зависает на длительный запрос к /stage, который сервер будет использовать для потоковой передачи событий клиенту. В вашем примере каждый клиент немедленно начнет этот запрос и оставит его открытым, пока сервер не завершит поток. Вы также можете закрыть поток от клиента, используя close() на EventSource.

Basi c Solution

Вы спросили, как получить /stage Обработчик транслирует или зеркалирует свои события всем подключенным в данный момент клиентам. Есть много способов сделать это sh, но вкратце вы хотите, чтобы функция lengthy_operation отправляла события всем читателям обработчиков /stage или в постоянную общую папку, из которой читали все обработчики /stage. Я покажу способ инкапсуляции первой идеи, описанной выше.

Рассмотрим обобщенный c потоковый класс событий, который сериализуется в data: <some message>:

class StreamEvent:
    def __init__(self, message: str) -> bytes:
        self.message = message

    def serialize(self) -> str:
        return f'data: {self.message}\n\n'.encode('utf-8')

и более конкретный c производный случай для связанных с файлом потоковых событий:

class FileStreamEvent(StreamEvent):
    def __init__(self, message: str, name: str):
        super().__init__(message)
        self.name = name

    def serialize(self) -> bytes:
        return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')

Вы можете создать чрезвычайно примитивный тип публикации / подписки контейнера, где /stage может затем подписывать слушателей, а lengthy_operation() может публиковать sh StreamEvent экземпляры для всех слушателей:

class StreamSource:
    def __init__(self):
        self.listeners: List[Queue] = []

    def put(self, event: StreamEvent):
        for listener in self.listeners:
            listener.put_nowait(event)

    def get(self):
        listener = Queue()
        self.listeners.append(listener)
        try:
            while True:
                event = listener.get()
                yield event.serialize()
        finally:
            self.listeners.remove(listener)

В StreamSource.get() вы, вероятно, захотите создать конечный случай (например, проверить событие "close" или "fini sh") для выхода из общего c while True и вы, вероятно, захотите установить таймаут для блокирующего вызова Queue.get(). Но ради этого примера я сохранил все как есть c.

Теперь lengthy_operation() просто нужна ссылка на StreamSource:

def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
    for stage in range(10):
        events.put(FileStreamEvent(f'stage {stage}: begin', name))
        sleep(2)
        events.put(FileStreamEvent(f'stage {stage}: end', name))
    events.put(FileStreamEvent('finished', name))

SSETest банка затем предоставьте общий экземпляр StreamSource для каждого lengthy_operation() вызова и SSETest.stage() может использовать StreamSource.get() для регистрации прослушивателя на этом общем экземпляре:

class SSETest:
    _stream_source: StreamSource = StreamSource()

    @cherrypy.expose
    def index(self):
        return Path('SSETest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(self._stream_source, name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Cache-Control'] = 'no-cache'
        cherrypy.response.headers['Content-Type'] = 'text/event-stream'
        def stream():
            yield from self._stream_source.get()
        return stream()

    stage._cp_config = {'response.stream': True}

Это завершено [ 1] пример того, как решить ваш непосредственный вопрос, но вы, скорее всего, захотите адаптировать его по мере приближения к окончательному опыту пользователя, который вы, вероятно, имеете в виду.

[1]: я пропустил импорт для удобства чтения, поэтому вот они:

from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from time import sleep
from typing import BinaryIO, List
import cherrypy

Условия последующего выхода

Поскольку вы используете cherrypy.quickstart(), в приведенном выше минимальном жизнеспособном решении вам придется принудительно выйти сервис SSETest, так как я не предполагал никаких изящных «остановок» для вас. Первое решение явно указывает на это, но не предлагает решения для удобства чтения.

Давайте рассмотрим несколько способов обеспечения некоторых начальных грациозных условий «остановки»:

Добавьте условие остановки в StreamSource

Сначала, по крайней мере, добавьте разумное условие остановки к StreamSource. Например, добавьте атрибут running, который позволяет изящно завершить работу StreamSource.get() while l oop. Затем установите разумное время ожидания Queue.get(), чтобы l oop мог периодически проверять этот атрибут running между обработкой сообщений. Затем убедитесь, что по крайней мере некоторые соответствующие сообщения шины CherryPy вызывают это поведение остановки. Ниже я ввел все это поведение в класс StreamSource, но вы также можете зарегистрировать отдельный плагин CherryPy уровня приложения для обработки вызовов в StreamSource.stop(), а не делать StreamSource плагином. Я продемонстрирую, как это выглядит, когда я добавлю отдельный обработчик сигнала.

class StreamSource(plugins.SimplePlugin):
    def __init__(self, bus: wspbus.Bus):
        super().__init__(bus)
        self.subscribe()
        self.running = True
        self.listeners: List[Queue] = []

    def graceful(self):
        self.stop()

    def exit(self):
        self.stop()

    def stop(self):
        self.running = False

    def put(self, event: StreamEvent):
        for listener in self.listeners:
            listener.put_nowait(event)

    def get(self):
        listener = Queue()
        self.listeners.append(listener)
        try:
            while self.running:
                try:
                    event = listener.get(timeout=1.0)
                    yield event.serialize()
                except Empty:
                    pass
        finally:
            self.listeners.remove(listener)

Теперь SSETest потребуется инициализировать StreamSource значением шины, поскольку класс теперь равен SimplePlugin:

    _stream_source: StreamSource = StreamSource(cherrypy.engine)

Вы обнаружите, что это решение делает вас намного ближе к тому, что вы, вероятно, хотите с точки зрения пользовательского опыта. Выполните прерывание клавиатуры, и CherryPy начнет останавливать систему, но первое изящное прерывание клавиатуры не будет публиковать sh stop сообщение, для этого вам нужно отправить второе прерывание клавиатуры.

Добавить SIGINT обработчик для захвата прерываний клавиатуры

В связи с тем, что cherrypy.quickstart работает с обработчиками сигналов, вы можете зарегистрировать обработчик SIGINT как CherryPy-совместимый SignalHandler плагин для изящной остановки StreamSource при первом прерывании клавиатуры.

Вот пример:

class SignalHandler(plugins.SignalHandler):
    def __init__(self, bus: wspbus.Bus, sse):
        super().__init__(bus)
        self.handlers = {
            'SIGINT': self.handle_SIGINT,
        }
        self.sse = sse

    def handle_SIGINT(self):
        self.sse.stop()
        raise KeyboardInterrupt()

Обратите внимание, что в этом случае я демонстрирую generi c обработчик уровня приложения, который вы можете затем настроить и инициализировать, изменив ваш запуск cherrypy.quickstart() logi c следующим образом:

sse = SSETest()
SignalHandler(cherrypy.engine, sse).subscribe()
cherrypy.quickstart(sse)

Для этого примера я предоставляю обобщенный метод c application SSETest.stop для инкапсуляции желаемого поведения:

class SSETest:
    _stream_source: StreamSource = StreamSource(cherrypy.engine)

    def stop(self):
        self._stream_source.stop()

Заключительный анализ

Я не пользователь CherryPy, и я только начал смотреть на него в первый раз вчера только для того, чтобы ответить на ваш вопрос, поэтому я оставлю «Лучшие практики CherryPy» на ваше усмотрение.

На самом деле ваша проблема - это * * c комбинация следующих Python вопросов:

  1. как я могу реализовать простой шаблон публикации / подписки? (ответил Queue);
  2. как мне создать условие выхода для подписчика l oop? (отвечает параметром Queue.get() timeout и атрибутом running)
  3. как я могу повлиять на условие выхода с прерываниями клавиатуры? (отвечает с помощью CherryPy-специфицированного c обработчика сигнала, но он просто лежит в основе концепций, которые вы найдете во встроенном * модуле *1134*signal)

Вы можете решить все эти вопросы разными способами, и некоторые склоняются в большей степени к универсальным c решениям "Pythoni c" (мое предпочтение там, где это имеет смысл), в то время как другие используют концепции CherryPy-centri c (и это делает смысл в тех случаях, когда вы хотите расширить поведение CherryPy, а не переписать или сломать его).

Например, вы можете использовать сообщения шины CherryPy для передачи потоковых сообщений, но для меня это запутывает логи вашего приложения c Слишком много в CherryPy-специфичных c функциях, поэтому я, вероятно, нашел бы золотую середину, где вы обрабатываете функции вашего приложения в общем (чтобы не связывать себя * CherryPy с * 1174), как видно из моего StreamSource примера использует стандартный шаблон Python Queue. Вы можете сделать StreamSource плагином, чтобы он мог отвечать на определенные сообщения шины CherryPy напрямую (как я покажу выше), или у вас мог бы быть отдельный плагин, который знает, как вызывать соответствующие специфичные для приложения c домены, такие как как StreamSource.stop() (аналогично тому, что я показываю с SignalHandler).

Наконец, все ваши вопросы великолепны, но на все они, скорее всего, уже были даны ответы на SO как общие c Python вопросы поэтому, пока я связываю ответы здесь с вашей проблемной областью CherryPy, я также хочу помочь вам (и будущим читателям) понять, как относиться к этим конкретным проблемам более абстрактно, чем CherryPy.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...