Как вывести в определенный сокет из потока в flask socketio? - PullRequest
1 голос
/ 29 мая 2020
• 1000 все клиенты получают все сообщения.

Вот как выглядит моя клиентская сторона:

var socket = io.connect('https://my_server_link');

	socket.on('connect', function() {
		console.log('Connection Success')
	});

	// Recieves the session id
	socket.on('connection_status',  (data)=>{
		console.log('Connection Status')
		console.log(data);
		session_id = data['session_id']
		socket.emit('verifyClient', {'session_id': session_id, 'api_id': api_id})
	})

	socket.on('verification_status', (data)=>{
		console.log('Verification Status')
		console.log(data)
		if (data['status']){
			console.log('Verified')
			socket.emit('setupUser', {'session_id': session_id, 'user_id': user_id})
		}
	})

	socket.on('user_setup_status', (data)=>{
		console.log('User Setup Status');
		console.log(data);
		if (data['status']){
			console.log('User Setup Successful, Starting monitoring...')
			running_ops = setInterval(()=>{
				op_interval(session_id)
			}, 5000)
		}
	})

	function op_interval(session_id){
		ctx2.drawImage(video, 0, 0, video.width, video.height);
		frame = canva.toDataURL('image/jpeg');
		frame = to_blob(frame);
		socket.emit('monitor', {
			'frame':frame,
			'session_id':session_id
			});
	}

	// when error messages are encountered, generate alert on client side
	socket.on('alert', (alert)=>{
		console.log(alert);
		// TODO : Handle Alerts Here
	})

А вот и моя серверная часть.

app = Flask(__name__)

CORS(app, resources={"*": {"origins": "*"}})

# eventlet.monkey_patch()

socketio = SocketIO(app, async_mode='threading', cors_allowed_origins="*")

# Set up socket connection to the client and register session id
@socketio.on('connect')
def handle_connection():
	print('Connection Successfull')
	# connection_id = request.sid
	session_id = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
	active_socket_sessions[session_id] = {'session_id': session_id}#, 'connection_id': connection_id}
	join_room(session_id)
	emit('connection_status', {'status':'success', 'session_id':session_id}, room=session_id)


@socketio.on('verifyClient')
def verification(data):
	session_id = data['session_id']
	client_id = data['api_id']
	verified = False
	message = "Session not found."

	if session_id in active_socket_sessions.keys():
		client, message = verify_client(database, client_id, active_sessions)

		if len(client.keys()) > 0:
			verified = True
			active_socket_sessions[session_id]['client_id'] = client['client_id']

		emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id}, room=session_id)
	else:
		emit('verification_status', {'status':verified, 'message': message, 'session_id':session_id})


@socketio.on('setupUser')
def user_setup(data):
	session_id = data['session_id']
	user_id = data['user_id']
	found = False
	message = 'Session not found.'

	if session_id in active_socket_sessions.keys():
		client_id = active_socket_sessions[session_id]['client_id']
		user, message = get_user(database, {'client_id': client_id}, user_id)

		if len(user.keys()) > 0:
			found = True
			user['person_encoding'] = str(user['person_encoding'])
			active_socket_sessions[session_id]['user'] = user
			rediss.set_dict(session_id, active_socket_sessions[session_id])
			active_socket_sessions[session_id] = {}

		emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id}, room=session_id)
	else:
		emit('user_setup_status', {'status': found, 'message': message, 'session_id': session_id})


@socketio.on('disconnect')
def handle_disconnect():
	# Handle Clearing Of Socket From Redis And Also Closing The Socket Connection
	emit('connection_ended', {'data': 'Successfully Disconnected, Socket Is Closed.'})


@socketio.on('monitor')
def monitor(data):
	frame = data['frame']
	session_id = data['session_id']
	socketio.start_background_task(monitoring_process, frame, session_id)


def monitoring_process(frame, session_id):
	if session_id in active_socket_sessions.keys():
		session = rediss.get_dict(session_id)
		client_id = session['client_id']
		user = session['user']
		user_encoding = np.array(eval(user['person_encoding']))
		person_id = user['person_id']
		browser_info = {}

		frame = cv2.imdecode(np.frombuffer(frame, np.uint8), -1)
		preds = model.model.predict(frame, user_encoding)

		if not preds['recognized']:
			alert = vars.BRISO_ALERTS[preds['message']]
			alert['timestamp'] = str(time.time())
			alert['log_id'] = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
			alert['person_id'] = person_id
			headers = browser_info
			message, flag = log_alert(database, client_id, alert, headers)
			preds['session_id'] = session_id
			socketio.emit('alert', preds, room=session_id)


if __name__ == "__main__":
	socketio.run(app, host=vars.HOST_NAME, port=vars.SERVER_PORT, debug=False, use_reloader=False)

1 Ответ

0 голосов
/ 30 мая 2020

Хорошо, поэтому, как предложил сам Мигель Гринберг, когда я спросил его на github, единственной заменой, которую мне пришлось сделать, было переключиться с установки моего собственного session_id на request.sid и не вызывать метод join_room.

Обновленный (рабочий) код выглядит так:

Для клиентской стороны socket.io:

socket.on('connect', function() {
		console.log('Connection Success')
	});

	// Recieves the session id
	socket.on('connection_status', (data)=>{
		console.log('Connection Status')
		console.log(data);
		session_id = data['session_id']
		socket.emit('verify_client', {'session_id': session_id, 'api_id': api_id}, (status)=>{
			socket.emit('setup_user', {'session_id': session_id, 'user_id': user_id}, (status)=>{
				running_ops = setInterval(()=>{
					op_interval(session_id)
				}, 5000)
			})
		})
	})
  
  function op_interval(session_id){
		ctx2.drawImage(video, 0, 0, video.width, video.height);
		frame = canva.toDataURL('image/jpeg');
		frame = to_blob(frame);
		socket.emit('monitor', {
			'frame':frame,
			'session_id':session_id
			});
	}
  
  // when error messages are encountered, generate alert on client side
	socket.on('alert', (alert)=>{
		console.log(alert);
	})

для python flask socketio

# Set up socket connection to the client and register session id
@socketio.on('connect')
def handle_connection():
	print('Connection Successfull')
	session_id = request.sid
	active_socket_sessions[session_id] = {'session_id': session_id}#, 'connection_id': connection_id}
	emit('connection_status', {'status':'success', 'session_id':session_id}, room=session_id)


@socketio.on('verify_client')
def verification(data):
	session_id = request.sid
	client_id = data['api_id']
	verified = False
	message = "Session not found."

	if session_id in active_socket_sessions.keys():
		client, message = verify_client(database, client_id, active_sessions)

		if len(client.keys()) > 0:
			verified = True
			active_socket_sessions[session_id]['client_id'] = client['client_id']

		return json.dumps({'status': verified, 'message': message, 'session_id': session_id})
	else:
		return json.dumps({'status': verified, 'message': message, 'session_id': session_id})

@socketio.on('setup_user')
def user_setup(data):
	session_id = request.sid
	user_id = data['user_id']
	found = False
	message = 'Session not found.'

	if session_id in active_socket_sessions.keys():
		client_id = active_socket_sessions[session_id]['client_id']
		user, message = get_user(database, {'client_id': client_id}, user_id)

		if len(user.keys()) > 0:
			found = True
			user['person_encoding'] = str(user['person_encoding'])
			active_socket_sessions[session_id]['user'] = user
			rediss.set_dict(session_id, active_socket_sessions[session_id])
			active_socket_sessions[session_id] = {}

		return json.dumps({'status': found, 'message': message, 'session_id': session_id})
	else:
		return json.dumps({'status': found, 'message': message, 'session_id': session_id})


@socketio.on('disconnect')
def handle_disconnect():
	# Handle Clearing Of Socket From Redis And Also Closing The Socket Connection
	emit('connection_ended', {'data': 'Successfully Disconnected, Socket Is Closed.'})


@socketio.on('monitor')
def monitor(data):
	frame = data['frame']
	session_id = request.sid

	process = Thread(target=monitoring_process, args=(frame, session_id,))
	process.start()
	process.join()
	socketio.start_background_task(monitoring_process, frame, session_id)

def monitoring_process(frame, session_id):
	if session_id in active_socket_sessions.keys():
		session = rediss.get_dict(session_id)
		client_id = session['client_id']
		user = session['user']
		user_encoding = np.array(eval(user['person_encoding']))
		person_id = user['person_id']
		browser_info = {}

		frame = cv2.imdecode(np.frombuffer(frame, np.uint8), -1)
		preds = model.model.predict(frame, user_encoding)

		if not preds['recognized']:
			alert = vars.ALERTS[preds['message']]
			alert['timestamp'] = str(time.time())
			alert['log_id'] = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
			alert['person_id'] = person_id
			headers = browser_info
			message, flag = log_alert(database, client_id, alert, headers)
			preds['session_id'] = session_id
			# return preds
			socketio.emit('alert', preds, room=session_id)
...