Я хочу, чтобы уведомления об этих этапах передавались всем подключенным клиентам.
Я подозреваю, что в конце концов вам понадобится больше контроля, но я отвечу на ваш вопрос как это спросили. Позже вы можете захотеть использовать приведенный ниже пример и отфильтровать передаваемые уведомления на основе сеанса пользователя или на основе определенной начальной отметки времени или какой-либо другой соответствующей концепции.
Каждый «подключенный клиент» фактически зависает на длительный запрос к /stage
, который сервер будет использовать для потоковой передачи событий клиенту. В вашем примере каждый клиент немедленно начнет этот запрос и оставит его открытым, пока сервер не завершит поток. Вы также можете закрыть поток от клиента, используя close()
на EventSource
.
Basi c Solution
Вы спросили, как получить /stage
Обработчик транслирует или зеркалирует свои события всем подключенным в данный момент клиентам. Есть много способов сделать это sh, но вкратце вы хотите, чтобы функция lengthy_operation
отправляла события всем читателям обработчиков /stage
или в постоянную общую папку, из которой читали все обработчики /stage
. Я покажу способ инкапсуляции первой идеи, описанной выше.
Рассмотрим обобщенный c потоковый класс событий, который сериализуется в data: <some message>
:
class StreamEvent:
def __init__(self, message: str) -> bytes:
self.message = message
def serialize(self) -> str:
return f'data: {self.message}\n\n'.encode('utf-8')
и более конкретный c производный случай для связанных с файлом потоковых событий:
class FileStreamEvent(StreamEvent):
def __init__(self, message: str, name: str):
super().__init__(message)
self.name = name
def serialize(self) -> bytes:
return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')
Вы можете создать чрезвычайно примитивный тип публикации / подписки контейнера, где /stage
может затем подписывать слушателей, а lengthy_operation()
может публиковать sh StreamEvent
экземпляры для всех слушателей:
class StreamSource:
def __init__(self):
self.listeners: List[Queue] = []
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while True:
event = listener.get()
yield event.serialize()
finally:
self.listeners.remove(listener)
В StreamSource.get()
вы, вероятно, захотите создать конечный случай (например, проверить событие "close" или "fini sh") для выхода из общего c while True
и вы, вероятно, захотите установить таймаут для блокирующего вызова Queue.get()
. Но ради этого примера я сохранил все как есть c.
Теперь lengthy_operation()
просто нужна ссылка на StreamSource
:
def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
for stage in range(10):
events.put(FileStreamEvent(f'stage {stage}: begin', name))
sleep(2)
events.put(FileStreamEvent(f'stage {stage}: end', name))
events.put(FileStreamEvent('finished', name))
SSETest
банка затем предоставьте общий экземпляр StreamSource
для каждого lengthy_operation()
вызова и SSETest.stage()
может использовать StreamSource.get()
для регистрации прослушивателя на этом общем экземпляре:
class SSETest:
_stream_source: StreamSource = StreamSource()
@cherrypy.expose
def index(self):
return Path('SSETest.html').read_text()
@cherrypy.expose
def upload(self, file):
name = file.filename.encode('iso-8859-1').decode('utf-8')
lengthy_operation(self._stream_source, name, file.file)
return 'OK'
@cherrypy.expose
def stage(self):
cherrypy.response.headers['Cache-Control'] = 'no-cache'
cherrypy.response.headers['Content-Type'] = 'text/event-stream'
def stream():
yield from self._stream_source.get()
return stream()
stage._cp_config = {'response.stream': True}
Это завершено [ 1] пример того, как решить ваш непосредственный вопрос, но вы, скорее всего, захотите адаптировать его по мере приближения к окончательному опыту пользователя, который вы, вероятно, имеете в виду.
[1]: я пропустил импорт для удобства чтения, поэтому вот они:
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from time import sleep
from typing import BinaryIO, List
import cherrypy
Условия последующего выхода
Поскольку вы используете cherrypy.quickstart()
, в приведенном выше минимальном жизнеспособном решении вам придется принудительно выйти сервис SSETest
, так как я не предполагал никаких изящных «остановок» для вас. Первое решение явно указывает на это, но не предлагает решения для удобства чтения.
Давайте рассмотрим несколько способов обеспечения некоторых начальных грациозных условий «остановки»:
Добавьте условие остановки в StreamSource
Сначала, по крайней мере, добавьте разумное условие остановки к StreamSource
. Например, добавьте атрибут running
, который позволяет изящно завершить работу StreamSource.get()
while
l oop. Затем установите разумное время ожидания Queue.get()
, чтобы l oop мог периодически проверять этот атрибут running
между обработкой сообщений. Затем убедитесь, что по крайней мере некоторые соответствующие сообщения шины CherryPy вызывают это поведение остановки. Ниже я ввел все это поведение в класс StreamSource
, но вы также можете зарегистрировать отдельный плагин CherryPy уровня приложения для обработки вызовов в StreamSource.stop()
, а не делать StreamSource
плагином. Я продемонстрирую, как это выглядит, когда я добавлю отдельный обработчик сигнала.
class StreamSource(plugins.SimplePlugin):
def __init__(self, bus: wspbus.Bus):
super().__init__(bus)
self.subscribe()
self.running = True
self.listeners: List[Queue] = []
def graceful(self):
self.stop()
def exit(self):
self.stop()
def stop(self):
self.running = False
def put(self, event: StreamEvent):
for listener in self.listeners:
listener.put_nowait(event)
def get(self):
listener = Queue()
self.listeners.append(listener)
try:
while self.running:
try:
event = listener.get(timeout=1.0)
yield event.serialize()
except Empty:
pass
finally:
self.listeners.remove(listener)
Теперь SSETest
потребуется инициализировать StreamSource
значением шины, поскольку класс теперь равен SimplePlugin
:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
Вы обнаружите, что это решение делает вас намного ближе к тому, что вы, вероятно, хотите с точки зрения пользовательского опыта. Выполните прерывание клавиатуры, и CherryPy начнет останавливать систему, но первое изящное прерывание клавиатуры не будет публиковать sh stop
сообщение, для этого вам нужно отправить второе прерывание клавиатуры.
Добавить SIGINT обработчик для захвата прерываний клавиатуры
В связи с тем, что cherrypy.quickstart
работает с обработчиками сигналов, вы можете зарегистрировать обработчик SIGINT
как CherryPy-совместимый SignalHandler
плагин для изящной остановки StreamSource
при первом прерывании клавиатуры.
Вот пример:
class SignalHandler(plugins.SignalHandler):
def __init__(self, bus: wspbus.Bus, sse):
super().__init__(bus)
self.handlers = {
'SIGINT': self.handle_SIGINT,
}
self.sse = sse
def handle_SIGINT(self):
self.sse.stop()
raise KeyboardInterrupt()
Обратите внимание, что в этом случае я демонстрирую generi c обработчик уровня приложения, который вы можете затем настроить и инициализировать, изменив ваш запуск cherrypy.quickstart()
logi c следующим образом:
sse = SSETest()
SignalHandler(cherrypy.engine, sse).subscribe()
cherrypy.quickstart(sse)
Для этого примера я предоставляю обобщенный метод c application SSETest.stop
для инкапсуляции желаемого поведения:
class SSETest:
_stream_source: StreamSource = StreamSource(cherrypy.engine)
def stop(self):
self._stream_source.stop()
Заключительный анализ
Я не пользователь CherryPy, и я только начал смотреть на него в первый раз вчера только для того, чтобы ответить на ваш вопрос, поэтому я оставлю «Лучшие практики CherryPy» на ваше усмотрение.
На самом деле ваша проблема - это * * c комбинация следующих Python вопросов:
- как я могу реализовать простой шаблон публикации / подписки? (ответил
Queue
); - как мне создать условие выхода для подписчика l oop? (отвечает параметром
Queue.get()
timeout
и атрибутом running
) - как я могу повлиять на условие выхода с прерываниями клавиатуры? (отвечает с помощью CherryPy-специфицированного c обработчика сигнала, но он просто лежит в основе концепций, которые вы найдете во встроенном * модуле *1134*
signal
)
Вы можете решить все эти вопросы разными способами, и некоторые склоняются в большей степени к универсальным c решениям "Pythoni c" (мое предпочтение там, где это имеет смысл), в то время как другие используют концепции CherryPy-centri c (и это делает смысл в тех случаях, когда вы хотите расширить поведение CherryPy, а не переписать или сломать его).
Например, вы можете использовать сообщения шины CherryPy для передачи потоковых сообщений, но для меня это запутывает логи вашего приложения c Слишком много в CherryPy-специфичных c функциях, поэтому я, вероятно, нашел бы золотую середину, где вы обрабатываете функции вашего приложения в общем (чтобы не связывать себя * CherryPy с * 1174), как видно из моего StreamSource
примера использует стандартный шаблон Python Queue
. Вы можете сделать StreamSource
плагином, чтобы он мог отвечать на определенные сообщения шины CherryPy напрямую (как я покажу выше), или у вас мог бы быть отдельный плагин, который знает, как вызывать соответствующие специфичные для приложения c домены, такие как как StreamSource.stop()
(аналогично тому, что я показываю с SignalHandler
).
Наконец, все ваши вопросы великолепны, но на все они, скорее всего, уже были даны ответы на SO как общие c Python вопросы поэтому, пока я связываю ответы здесь с вашей проблемной областью CherryPy, я также хочу помочь вам (и будущим читателям) понять, как относиться к этим конкретным проблемам более абстрактно, чем CherryPy.