В настоящее время я работаю над школьным проектом, задачей которого, помимо прочего, является настройка многопоточной системы сервер / клиент. Предполагается, что каждому клиенту в системе при подключении к нему назначается собственный поток на сервере. Кроме того, я хотел бы, чтобы сервер запускал другие потоки, один из которых касался ввода из командной строки, а другой - широковещательной рассылки сообщений всем клиентам. Однако я не могу заставить это работать так, как я хочу. Кажется, что потоки блокируют друг друга. Я хотел бы, чтобы моя программа принимала входные данные из командной строки, в то же время, когда сервер прослушивает подключенных клиентов, и так далее.
Я новичок в программировании на Python и многопоточности, и, хотя я думаю, что моя идея хороша, я не удивлен, что мой код не работает. Дело в том, что я не совсем уверен, как я собираюсь реализовать передачу сообщений между различными потоками. Я также не уверен, как правильно реализовать команды блокировки ресурса. Я собираюсь опубликовать код для моего файла сервера и моего файла клиента здесь, и я надеюсь, что кто-то может помочь мне с этим. Я думаю, что на самом деле это должны быть два относительно простых сценария. Я попытался прокомментировать мой код настолько хорошо, насколько это возможно.
import select
import socket
import sys
import threading
import client
class Server:
#initializing server socket
def __init__(self, event):
self.host = 'localhost'
self.port = 50000
self.backlog = 5
self.size = 1024
self.server = None
self.server_running = False
self.listen_threads = []
self.local_threads = []
self.clients = []
self.serverSocketLock = None
self.cmdLock = None
#here i have also declared some events for the command line input
#and the receive function respectively, not sure if correct
self.cmd_event = event
self.socket_event = event
def openSocket(self):
#binding server to port
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((self.host, self.port))
self.server.listen(5)
print "Listening to port " + str(self.port) + "..."
except socket.error, (value,message):
if self.server:
self.server.close()
print "Could not open socket: " + message
sys.exit(1)
def run(self):
self.openSocket()
#making Rlocks for the socket and for the command line input
self.serverSocketLock = threading.RLock()
self.cmdLock = threading.RLock()
#set blocking to non-blocking
self.server.setblocking(0)
#making two threads always running on the server,
#one for the command line input, and one for broadcasting (sending)
cmd_thread = threading.Thread(target=self.server_cmd)
broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients])
cmd_thread.daemon = True
broadcast_thread.daemon = True
#append the threads to thread list
self.local_threads.append(cmd_thread)
self.local_threads.append(broadcast_thread)
cmd_thread.start()
broadcast_thread.start()
self.server_running = True
while self.server_running:
#connecting to "knocking" clients
try:
c = client.Client(self.server.accept())
self.clients.append(c)
print "Client " + str(c.address) + " connected"
#making a thread for each clientn and appending it to client list
listen_thread = threading.Thread(target=self.listenToClient,args=[c])
self.listen_threads.append(listen_thread)
listen_thread.daemon = True
listen_thread.start()
#setting event "client has connected"
self.socket_event.set()
except socket.error, (value, message):
continue
#close threads
self.server.close()
print "Closing client threads"
for c in self.listen_threads:
c.join()
def listenToClient(self, c):
while self.server_running:
#the idea here is to wait until the thread gets the message "client
#has connected"
self.socket_event.wait()
#then clear the event immidiately...
self.socket_event.clear()
#and aquire the socket resource
self.serverSocketLock.acquire()
#the below is the receive thingy
try:
recvd_data = c.client.recv(self.size)
if recvd_data == "" or recvd_data == "close\n":
print "Client " + str(c.address) + (" disconnected...")
self.socket_event.clear()
self.serverSocketLock.release()
return
print recvd_data
#I put these here to avoid locking the resource if no message
#has been received
self.socket_event.clear()
self.serverSocketLock.release()
except socket.error, (value, message):
continue
def server_cmd(self):
#this is a simple command line utility
while self.server_running:
#got to have a smart way to make this work
self.cmd_event.wait()
self.cmd_event.clear()
self.cmdLock.acquire()
cmd = sys.stdin.readline()
if cmd == "":
continue
if cmd == "close\n":
print "Server shutting down..."
self.server_running = False
self.cmdLock.release()
def broadcast(self, clients):
while self.server_running:
#this function will broadcast a message received from one
#client, to all other clients, but i guess any thread
#aspects applied to the above, will work here also
try:
send_data = sys.stdin.readline()
if send_data == "":
continue
else:
for c in clients:
c.client.send(send_data)
self.serverSocketLock.release()
self.cmdLock.release()
except socket.error, (value, message):
continue
if __name__ == "__main__":
e = threading.Event()
s = Server(e)
s.run()
А потом клиентский файл
import select
import socket
import sys
import server
import threading
class Client(threading.Thread):
#initializing client socket
def __init__(self,(client,address)):
threading.Thread.__init__(self)
self.client = client
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
def run(self):
#connect to server
self.client.connect(('localhost',50000))
#making a lock for the socket resource
self.clientSocketLock = threading.Lock()
self.client.setblocking(0)
self.client_running = True
#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)
#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)
#not actually sure wat daemon means
listen.daemon = True
speak.daemon = True
#appending the threads to the thread-list
self.running_threads.append(listen)
self.running_threads.append(speak)
listen.start()
speak.start()
#this while-loop is just for avoiding the script terminating
while self.client_running:
dummy = 1
#closing the threads if the client goes down
print "Client operating on its own"
self.client.close()
#close threads
for t in self.running_threads:
t.join()
return
#defining "listen"-function
def listenToServer(self):
while self.client_running:
#here i acquire the socket to this function, but i realize I also
#should have a message passing wait()-function or something
#somewhere
self.clientSocketLock.acquire()
try:
data_recvd = self.client.recv(self.size)
print data_recvd
except socket.error, (value,message):
continue
#releasing the socket resource
self.clientSocketLock.release()
#defining "speak"-function, doing much the same as for the above function
def speakToServer(self):
while self.client_running:
self.clientSocketLock.acquire()
try:
send_data = sys.stdin.readline()
if send_data == "close\n":
print "Disconnecting..."
self.client_running = False
else:
self.client.send(send_data)
except socket.error, (value,message):
continue
self.clientSocketLock.release()
if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()
Я понимаю, что вам нужно прочитать несколько строк кода, но, как я уже сказал, я думаю, что концепция и сценарий сами по себе должны быть довольно просты для понимания. Было бы очень приятно, если бы кто-нибудь помог мне правильно синхронизировать мои потоки =)
Заранее спасибо
--- Редактировать ---
OK. Итак, теперь я упростил свой код до того, что он просто содержит функции отправки и получения как на сервере, так и на клиентских модулях. Клиенты, подключающиеся к серверу, получают свои собственные потоки, а функции отправки и получения в обоих модулях работают в своих собственных отдельных потоках. Это работает как чудо, с функцией широковещания в модуле сервера, отображающей строки, которые он получает от одного клиента всем клиентам. Пока все хорошо!
Следующее, что я хочу, чтобы мой скрипт выполнял, - это принятие определенных команд, то есть «close», в клиентском модуле, чтобы завершить работу клиента и присоединить все запущенные потоки в списке потоков. Я использую флаг события, чтобы уведомить listenToServer и основной поток, что поток speakToServer прочитал вход «close». Кажется, что основной поток выпрыгивает из цикла while и запускает цикл for, который должен присоединиться к другим потокам. Но тут висит. Кажется, что цикл while в потоке listenToServer никогда не останавливается, даже если server_running должен быть установлен в False, когда установлен флаг события.
Я публикую здесь только клиентский модуль, потому что я полагаю, что ответ для синхронизации этих двух потоков будет касаться синхронизации большего количества потоков как в клиентском, так и в серверном модуле.
import select
import socket
import sys
import server_bygg0203
import threading
from time import sleep
class Client(threading.Thread):
#initializing client socket
def __init__(self,(client,address)):
threading.Thread.__init__(self)
self.client = client
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
self.disconnected = threading.Event()
def run(self):
#connect to server
self.client.connect(('localhost',50000))
#self.client.setblocking(0)
self.client_running = True
#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)
#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)
#not actually sure what daemon means
listen.daemon = True
speak.daemon = True
#appending the threads to the thread-list
self.running_threads.append((listen,"listen"))
self.running_threads.append((speak, "speak"))
listen.start()
speak.start()
while self.client_running:
#check if event is set, and if it is
#set while statement to false
if self.disconnected.isSet():
self.client_running = False
#closing the threads if the client goes down
print "Client operating on its own"
self.client.shutdown(1)
self.client.close()
#close threads
#the script hangs at the for-loop below, and
#refuses to close the listen-thread (and possibly
#also the speak thread, but it never gets that far)
for t in self.running_threads:
print "Waiting for " + t[1] + " to close..."
t[0].join()
self.disconnected.clear()
return
#defining "speak"-function
def speakToServer(self):
#sends strings to server
while self.client_running:
try:
send_data = sys.stdin.readline()
self.client.send(send_data)
#I want the "close" command
#to set an event flag, which is being read by all other threads,
#and, at the same time set the while statement to false
if send_data == "close\n":
print "Disconnecting..."
self.disconnected.set()
self.client_running = False
except socket.error, (value,message):
continue
return
#defining "listen"-function
def listenToServer(self):
#receives strings from server
while self.client_running:
#check if event is set, and if it is
#set while statement to false
if self.disconnected.isSet():
self.client_running = False
try:
data_recvd = self.client.recv(self.size)
print data_recvd
except socket.error, (value,message):
continue
return
if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()
Позже, когда я получу эту серверную / клиентскую систему, я буду использовать эту систему на некоторых моделях лифтов, которые у нас есть здесь в лаборатории, причем каждый клиент получает заказы на этаж или звонки «вверх» и «вниз». Сервер будет запускать алгоритм распределения и обновлять очереди лифта на клиентах, которые наиболее подходят для запрошенного заказа. Я понимаю, что это долгий путь, но я думаю, что нужно сделать один шаг за раз =)
Надеюсь, у кого-то есть время разобраться в этом. Заранее спасибо.