Итак, у меня есть файл, который работает как сервер сокетов, и мне нужно, чтобы он был в структуре класса. Я хотел бы знать, как вы это сделаете. Существуют некоторые глобальные и неглобальные переменные, например, какой из них будет self. или быть частным. Класс должен быть Server с init , и все методы должны быть включены в класс. После того, как класс написан, код должен создать объект Server и запустить основные методы, а именно:
server_socket.bind(('0.0.0.0', 2303))
server_socket.listen(5)
handle_new_connection()
handle_receive_data()
handle_sending_msgs()
del public_msgs[:]
del private_msgs[:]
public_msgs.append((None, "bye"))
handle_sending_msgs()
server_socket.close()
Вот код:
import socket
import select
import datetime
def get_current_time():
"""Returns hh:mm"""
now = datetime.datetime.now()
time = str(now.hour) + ":" + str(now.minute)
return time
def get_name(user_socket):
"""Returns the name of the user_socket, if he is an admin he gets @ before his name"""
name = sockets_names[user_socket]
if len(admins) != 0:
if user_socket in admins:
name = "@" + name
return name
def get_data_length(data):
"""Returns the length of the data as string with length 3"""
length = str(len(data))
while len(length) < 3:
length = "0" + length
return length
def get_socket_by_name(name):
"""Returns the socket with the name, if none exists return None"""
if len(sockets_names) != 0:
for socket_pair, socket_name in sockets_names.items():
if name == socket_name:
return socket_pair
return None
def get_admins_as_string():
admins_names_lst = []
for admin_socket in admins:
admins_names_lst.append(sockets_names[admin_socket])
return str(admins_names_lst)[1:-1]
def remove_socket(removed_socket):
open_sockets.remove(removed_socket)
if removed_socket in admins:
admins.remove(removed_socket)
del sockets_names[removed_socket]
def handle_new_connection():
for new_connection in rlist:
if new_connection is server_socket:
(new_socket, address) = server_socket.accept()
open_sockets.append(new_socket)
sockets_names[new_socket] = "Anonymous"
if len(admins) == 0:
admins.append(new_socket)
print("New Connection And Admin")
else:
print("New Connection")
def handle_receive_data():
for current_socket in rlist:
if current_socket is not server_socket:
data_length = int(current_socket.recv(3).decode('utf-8'))
data = current_socket.recv(data_length).decode('utf-8')
if data[0] == '/':
handle_commands(current_socket, data[1:])
else:
private_msgs.append((current_socket, "You: " + data))
if current_socket in muted_sockets:
private_msgs.append((current_socket, """"You are muted and so can't send msgs to everyone. You can
ask one of the admins to unmute you in a private msg"""))
else:
public_msgs.append((current_socket, get_name(current_socket) + ": " + data))
def handle_sending_msgs():
for message in public_msgs:
(sender_socket, data) = message
data = get_current_time() + " " + data
for receiver_socket in wlist:
if receiver_socket is not sender_socket:
receiver_socket.send(bytes(get_data_length(data), 'utf8'))
receiver_socket.send(bytes(data, 'utf8'))
if message in public_msgs:
public_msgs.remove(message)
for message in private_msgs:
(receiver_socket, data) = message
data = get_current_time() + " " + data
if receiver_socket in wlist:
receiver_socket.send(get_data_length(data).encode('utf-8'))
receiver_socket.send(data.encode('utf-8'))
if message in private_msgs:
private_msgs.remove(message)
if data.split(' ')[1] == "bye":
remove_socket(receiver_socket)
def handle_commands(current_socket, data):
command = data.split(' ')[0].lower()
data = ' '.join(data.split(' ')[1:])
if command == "exit":
public_msgs.append((current_socket, get_name(current_socket) + " left the chat."))
private_msgs.append((current_socket, "bye"))
print("Connection with " + get_name(current_socket) + " closed.")
elif command == 'rename' or command == 'setname':
if data not in sockets_names.values():
if data.lower() != "you" and data.lower() != "server" and data.lower()[0] != "@":
sockets_names[current_socket] = data
private_msgs.append((current_socket, "Your name has been successfully changed to " + data + "."))
else:
private_msgs.append((current_socket, data + " is not a valid name."))
else:
private_msgs.append((current_socket, "This name is already taken."))
elif command == 'setadmin' or command == "promote":
if current_socket in admins:
if data not in sockets_names.values():
private_msgs.append((current_socket, "This name doesn't exist in this server."))
else:
new_admin_socket = get_socket_by_name(data)
admins.append(new_admin_socket)
private_msgs.append((current_socket, data + " has been promoted to admin."))
public_msgs.append((current_socket, get_name(current_socket) + " promoted " + data + " to admin."))
else:
private_msgs.append((current_socket, "You don't have access to this command."))
elif command == 'kick' or command == 'remove':
if current_socket in admins:
if data not in sockets_names.values():
private_msgs.append((current_socket, "This name doesn't exist in this server."))
else:
kicked_socket = get_socket_by_name(data)
private_msgs.append((current_socket, data + " has been successfully kicked and removed."))
public_msgs.append((current_socket, get_name(current_socket) + " kicked and removed " + data))
private_msgs.append((kicked_socket, get_name(current_socket) + " kicked you."))
private_msgs.append((kicked_socket, "bye"))
elif command == 'mute':
if current_socket in admins:
if data not in sockets_names.values():
private_msgs.append((current_socket, "This name doesn't exist in this server."))
else:
muted_socket = get_socket_by_name(data)
muted_sockets.append(muted_socket)
private_msgs.append((current_socket, data + " has been successfully muted."))
public_msgs.append((current_socket, get_name(current_socket) + " muted " + data))
private_msgs.append((muted_socket, get_name(current_socket) + " muted you."))
else:
private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
elif command == 'unmute':
if current_socket in admins:
if data not in sockets_names.values():
private_msgs.append((current_socket, "This name doesn't exist in this server."))
else:
unmute_socket = get_socket_by_name(data)
if unmute_socket not in muted_sockets:
private_msgs.append((current_socket, "This user isn't muted."))
else:
muted_sockets.remove(unmute_socket)
private_msgs.append((current_socket, data + " has been successfully unmuted."))
public_msgs.append((current_socket, get_name(current_socket) + " unmuted " + data))
private_msgs.append((unmute_socket, get_name(current_socket) + " unmuted you."))
else:
private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
elif command == 'msg' or command == 'message' or command == "prvmsg" or command == "privatemessage":
send_to_name = data.split(' ')[0]
data = ' '.join(data.split(' ')[1:])
if send_to_name not in sockets_names.values():
private_msgs.append((current_socket, "This name doesn't exist in this server."))
else:
send_to_socket = get_socket_by_name(send_to_name)
private_msgs.append(
(current_socket, "You -> " + send_to_name + ": " + data))
private_msgs.append((send_to_socket, get_name(current_socket) + " -> " + send_to_name + ": " + data))
elif command == 'admin' or command == "admins" or command == "adminlist" or command == "adminslist":
private_msgs.append((current_socket, "Admins: " + get_admins_as_string()))
elif command == 'users' or command == "userslist" or command == 'user' or command == "userlist":
private_msgs.append((current_socket, "Users: " + str(sockets_names.values())[1:-1]))
elif command == 'help' or command == '?':
commands = """/rename <name> - change your name.\n/msg <user> <msg> - will send <msg> as a private massage that only <user>
can see.\n/users - returns the names of all the connected users.\n/admins - returns the names of all the connected admins.\n/exit -
will disconnect you.\n\nAdmins' Commends Only:\n/kick <user> - kick the <user> from the server.\n/promote <user> - will ser <user> to
be an admin.\nmute <user> - will no let him send public msgs, only privates.\nunmute <user> - will cancel the mute on this user."""
private_msgs.append((current_socket, "\nCommands:\n" + commands + "\n"))
else:
private_msgs.append((current_socket, command + " is not a valid command."))
def main():
global server_socket, open_sockets, sockets_names, admins, muted_sockets, public_msgs, private_msgs
global rlist, wlist, xlist
server_socket = socket.socket()
try:
server_socket.bind(('0.0.0.0', 2303))
server_socket.listen(5)
open_sockets = []
sockets_names = {}
admins = []
muted_sockets = []
public_msgs = []
private_msgs = []
while True:
rlist, wlist, xlist = select.select([server_socket] + open_sockets, open_sockets, [])
handle_new_connection()
handle_receive_data()
handle_sending_msgs()
finally:
del public_msgs[:]
del private_msgs[:]
public_msgs.append((None, "bye"))
handle_sending_msgs()
server_socket.close()
if __name__ == '__main__':
main()