У меня есть приложение Flask, которое после определенного вызова rest запускает несколько модулей с использованием ProcessPoolExecutor.
ОБНОВЛЕНО: Добавлен redis в качестве очереди сообщений (с использованием docker, redis в качестве хоста redis)
socketio = SocketIO(app, message_queue='redis://redis')
(...)
def emit_event(evt, message):
socketio.emit(evt, message, namespace='/test')
@app.route('/info', methods=['GET'])
def info():
emit_event('update_reports', '')
(...)
if __name__ == "__main__":
socketio.run(host='0.0.0.0', threaded=True)
Теперь, когда я добавил redis, он по-прежнему работает при отправке из основного приложения.
Вот некоторые из кода, который я запускаю подпроцесс:
def __init__(self):
self.executor = futures.ProcessPoolExecutor(max_workers=4)
self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')
(...)
future = self.executor.submit(process, params)
future.add_done_callback(functools.partial(self.finished_callback, pid))
Затем в этом обратном вызове я вызываю метод emit_event
:
def finished_callback(self, pid, future):
pid.status = Status.DONE.value
pid.finished_at = datetime.datetime.utcnow
pid.save()
self.socketio.emit('update_reports', 'done', namespace='/test')
Получение и отправка / отправка сообщений от / к клиенту с моего контроллера работает очень хорошо, также, если я вызываю / info из curl или почтальона, мой клиент получает сообщение - но - при попытке отправить событие таким же способом изнутри этого обратный вызов подпроцесса, теперь он показывает эту ошибку:
Это в основном для уведомлений, например для уведомления о завершении длинного процесса и тому подобного.
INFO:socketio:emitting event "update_reports" to all [/test]
ERROR:socketio:Cannot publish to redis... retrying
ERROR:socketio:Cannot publish to redis... giving up
Что я делаю не так?
Спасибо!