Я запускаю приложение Flask через waitress.serve(app, send_bytes=1)
.Я пытаюсь реализовать функциональность, чтобы остановить длительный запрос SQL, если клиент закрывает соединение (например, нажав кнопку «Стоп» в браузере).
Я наткнулся на этот принятый ответ на точно такую же проблему: Прекратите обработку маршрута Flask, если запрос прерван , что предполагает регулярную попытку записи в соединение, чтобы проверить, все ли оно открыто.(Я пошел даже дальше, чем пытался записать 0 байтов, я пытаюсь писать 1-байтовую строку "q" один раз в секунду.) Но когда я реализовал это в своем приложении, он не производит никакого соединения с закрытым сокетом или соединения«закрытые» ошибки, если браузер нажимает «Стоп» во время ожидания ответа.
, то есть ожидаемое поведение состоит в том, что клиент получает «q» один раз в секунду до получения полного ответа, и вывод консоли Python«получение из очереди», за которым следует «соединение все еще живо, очередь пуста», повторяется каждую секунду, пока не будет нажата кнопка «Стоп» в браузере и не возникнет какое-то исключение (у меня есть несколько обработчиков исключений, настроенных здесь, чтобы перехватывать их везде, где они должны отображатьсявверх).Но фактическое поведение заключается в том, что после нажатия кнопки «Стоп» в браузере консоль python продолжает выводить «получение из очереди» и «соединение все еще живо, очередь пуста» каждую секунду до завершения запроса, а затем «поток: поставить в очередь»и "получил из очереди" вошли.Ни в коем случае не выдается ошибка сокета / соединения.
Для сравнения, в файле node.js (на том же компьютере с Windows), используя Express, использование req.on('close', () => {console.log('connection closed');})
немедленно обнаруживает нажатие кнопки Stop в браузере.
Есть идеи о том, почему он так себя ведет?Как я могу определить, закрыл ли клиент соединение в Waitress / Flask в Windows?
query_string = 'xxx'
@app.route('/filter')
def filter_route():
response_from_filter = lambda filter: json.dumps({
...
})
def threaded_query(query_str, queue_obj):
def callable():
try:
filter = db.session.execute(query_string)
queue_obj.put(response_from_filter(filter))
print "thread: put on queue"
except Exception as e:
print e, "thread error"
return callable
def generate(queue_obj):
while True:
try:
print "getting from queue"
result = queue_obj.get(block=True, timeout=1)
print "got from queue"
yield result
return
except GeneratorExit as ge:
print "generator exit"
break
except Queue.Empty as e:
print e, "connection still alive, queue empty"
yield "q"
except Exception as e:
print e, "generic exception"
query_queue = Queue.Queue()
thread = threading.Thread(target=threaded_query(query_string, query_queue))
thread.start()
try:
return Response(stream_with_context(generate(query_queue)))
except Exception as e:
print e, "Response exception"