У меня есть следующая система: [Клиент] - [Веб-сервер] - [Connecotr] .
Соединитель - это своего рода промежуточный код между веб-сервером и источником данных .
Мне нужно контролировать соединение сервера с разъемом. Если соединение потеряно, я должен уведомить клиента.
Связь между веб-сервером и соединителем организована с помощью socketio.
Проблема заключается в том, что если соединитель перестает работать, то веб-сервер узнает об этом только через минуту (это в лучшем случае).
Я решил, что сервер должен проверять состояние соединителя каждую секунду.
Когда соединитель при подключении к серверу запускается фоновая задача. Суть задачи: каждую секунду: 1) фиксировать время; 2) сохранить фиксированное время в стеке; 3) отправить эхо-сообщение на соединитель. (см. server.background_thread)
Соединитель принимает эхо-сообщение и метку времени в качестве параметра и отправляет эхо-сообщение на веб-сервер, в качестве параметра он передает полученную метку времени. (см. client.echo)
Веб-сервер получает эхо-сообщение, если временная метка равна последнему значению в стеке, то это значение удаляется из стека. (см. server.on_echo_connector)
На веб-сервере на каждой итерации проверяется размер стека (см. server.background_thread). Если оно больше 5, это означает, что соединитель не отвечал на эхо-сообщение 5 раз, мы считаем, что соединитель недоступен, и отсоединяемся от него.
Когда сервер обнаружит, что соединитель недоступно, необходимо завершить поток, который отправил эхо-сообщения на соединитель.
Как только размер стека больше 5, я выхожу из бесконечного l oop и вызываю flask_socketio.disconnect (connector_sid, '/ connector')
. После этого вызова ничего не работает (например, print
)
В методе on_disconnect_connector
(сервер), thread.join()
вызывается и никогда не завершается.
И мне нужно завершить поток, чтобы при повторном запуске разъема он успешно подключался и все начиналось заново.
Как решить эту проблему?
server
# -*- coding: utf-8 -*-
import os
import threading
import time
import collections
from datetime import datetime
import flask
import flask_socketio
def get_unix_time():
return int(time.mktime(datetime.now().timetuple()))
class Stack(collections.deque):
def __init__(self, iterable=(), maxlen=None):
collections.deque.__init__(self, iterable, maxlen)
@property
def size(self):
return len(self)
@property
def empty(self):
return self.size == 0
@property
def head(self):
return self[-1]
@property
def tail(self):
return self[0]
def push(self, x):
self.append(x)
# SERVER
app = flask.Flask(__name__)
sio = flask_socketio.SocketIO(app, async_mode='gevent')
connector_sid = None
echo_stack = Stack()
thread = None
thread_lock = threading.Lock()
def background_thread(app):
time.sleep(2) # delay for normal connection
while True:
if echo_stack.size >= 5:
break
time_ = get_unix_time()
echo_stack.push(time_)
sio.emit('echo', time_, namespace='/connector')
sio.sleep(1)
with app.app_context():
flask_socketio.disconnect(connector_sid, '/connector')
@sio.on('connect', namespace='/connector')
def on_connect_connector():
"""Connector connection event handler."""
global connector_sid, thread
print 'Attempt to connect a connector {}...'.format(request.sid)
# if the connector is already connected, reject the connection
if connector_sid is not None:
print 'Connection for connector {} rejected'.format(request.sid)
return False
# raise flask_socketio.ConnectionRefusedError('Connector already connected')
connector_sid = request.sid
print('Connector {} connected'.format(request.sid))
with thread_lock:
if thread is None:
thread = sio.start_background_task(
background_thread, current_app._get_current_object())
# notify clients about connecting a connector
sio.emit('set_connector_status', True, namespace='/client')
@sio.on('disconnect', namespace='/connector')
def on_disconnect_connector():
"""Connector disconnect event handler."""
global connector_sid, thread
print 'start join'
thread.join()
print 'end join'
thread = None
print 'after disconet:', thread
connector_sid = None
echo_stack.clear()
print('Connector {} disconnect'.format(request.sid))
# notify clients of disconnected connector
sio.emit('set_connector_status', False, namespace='/client')
@sio.on('echo', namespace='/connector')
def on_echo_connector(time_):
if not echo_stack.empty:
if echo_stack.head == time_:
echo_stack.pop()
@sio.on('message', namespace='/connector')
def on_message_connector(cnt):
# print 'Msg: {}'.format(cnt)
pass
if __name__ == '__main__':
sio.run(app)
client
# -*- coding: utf-8 -*-
import sys
import threading
import time
import socketio
import socketio.exceptions
sio = socketio.Client()
thread = None
thread_lock = threading.Lock()
work = False
def background_thread():
# example task
cnt = 0
while work:
cnt += 1
if cnt % 10 == 0:
sio.emit('message', cnt // 10, namespace='/connector')
sio.sleep(0.1)
@sio.on('connect', namespace='/connector')
def on_connect():
"""Server connection event handler."""
global thread, work
print '\n----- Connected to server -----' \
'\n----- My SID: {} -----\n'.format(sio.sid)
work = True # set flag
# run test task
with thread_lock:
if thread is None:
thread = sio.start_background_task(background_thread)
@sio.on('disconnect', namespace='/connector')
def on_disconnect():
"""Server disconnect event handler."""
global thread, work
# clear the work flag so that at the next iteration the endless loop ends
work = False
thread.join()
thread = None
# disconnect from server
sio.disconnect()
print '\n----- Disconnected from server -----\n'
# switch to the mode of infinite attempts to connect to the server
main()
@sio.on('echo', namespace='/connector')
def on_echo(time_):
sio.emit('echo', time_, namespace='/connector')
def main():
while True:
try:
sio.connect('http://localhost:5000/connector',
namespaces=['/connector'])
sio.wait()
except socketio.exceptions.ConnectionError:
print 'Trying to connect to the server...'
time.sleep(1)
except KeyboardInterrupt:
print '\n---------- EXIT ---------\n'
sys.exit()
except Exception as e:
print e
if __name__ == '__main__':
print '\n---------- START CLIENT ----------\n'
main()
Python 2,7