У меня есть функция машинного обучения, которая зацикливает видео в реальном времени, если алгоритм находит неисправность в видео, мне нужно отправить уведомление во внешний интерфейс, я использую Tornado Web и Python. я использовал Request-Handler и Websocket-Handler, но функция не позволяет серверу нормально работать, потому что он блокируется в цикле.
так как я могу использовать эту функцию на моем сервере.
class DataSource(object):
def __init__(self, initial_data=None):
self._data = initial_data
@property
def data(self):
return self._data
@data.setter
def data(self, new_data):
self._data = new_data
class EventSource(web.RequestHandler):
def initialize(self, source):
self.source = source
self._last = None
self.set_header('content-type', 'text/event-stream')
self.set_header('cache-control', 'no-cache')
@tornado.gen.coroutine
def publish(self, data):
try:
self.write('data: {}\n\n'.format(data))
yield self.flush()
except StreamClosedError:
pass
@tornado.gen.coroutine
def get(self):
while True:
if self.source.data != self._last:
yield self.publish(self.source.data)
self._last = self.source.data
else:
yield tornado.gen.sleep(0.005)
generator = processing.video_processing() #loop function (yields)
publisher = DataSource(next(generator))
def get_next():
publisher.data = next(generator)
print(publisher.data)
checker = PeriodicCallback(lambda: get_next(), 1000.)
checker.start()
class Application(tornado.web.Application):
def __init__(self):
handlers = [
tornado.web.url(r"/charts", ChartPageHandler, name="charts"),
tornado.web.url(r"/historique_images", HistoriqueImagesPageHandler, name="historique_images"),
tornado.web.url(r"/tables", TablePageHandler, name="tables"),
tornado.web.url(r"/streaming",StreamingPageHandler, name="streaming"),
tornado.web.url(r'/ws/', WebSocketHandler, name="websocket"),
tornado.web.url(r'/events', EventSource, dict(source=publisher)),
tornado.web.url(r'/ws_stream/', StreamSocketHandler, name="ws_stream"),
]
settings = dict(
template_path = os.path.join(os.path.dirname(__file__), "templates"),
static_path = os.path.join(os.path.dirname(__file__), "static"),
debug = True,
cookie_secret = "bZJc2sWbQLKos6GkHn/VB9oXwQt8S0R0kRvJ5/xJ89E=",
login_url = "/login",
xsrf_cookies = True,
)
super(Application, self).__init__(handlers, **settings)
if __name__ == "__main__":
options.parse_command_line()
app = Application()
server = HTTPServer(app)
server.listen(config.PORT)
signal.signal(signal.SIGINT, lambda x, y:IOLoop.instance().stop())
IOLoop.instance().start()