Синхронизация потоков в Python - PullRequest
1 голос
/ 01 марта 2012

В настоящее время я работаю над школьным проектом, задачей которого, помимо прочего, является настройка многопоточной системы сервер / клиент. Предполагается, что каждому клиенту в системе при подключении к нему назначается собственный поток на сервере. Кроме того, я хотел бы, чтобы сервер запускал другие потоки, один из которых касался ввода из командной строки, а другой - широковещательной рассылки сообщений всем клиентам. Однако я не могу заставить это работать так, как я хочу. Кажется, что потоки блокируют друг друга. Я хотел бы, чтобы моя программа принимала входные данные из командной строки, в то же время, когда сервер прослушивает подключенных клиентов, и так далее.

Я новичок в программировании на Python и многопоточности, и, хотя я думаю, что моя идея хороша, я не удивлен, что мой код не работает. Дело в том, что я не совсем уверен, как я собираюсь реализовать передачу сообщений между различными потоками. Я также не уверен, как правильно реализовать команды блокировки ресурса. Я собираюсь опубликовать код для моего файла сервера и моего файла клиента здесь, и я надеюсь, что кто-то может помочь мне с этим. Я думаю, что на самом деле это должны быть два относительно простых сценария. Я попытался прокомментировать мой код настолько хорошо, насколько это возможно.

import select
import socket
import sys
import threading
import client

class Server:

#initializing server socket
def __init__(self, event):
    self.host = 'localhost'
    self.port = 50000
    self.backlog = 5
    self.size = 1024
    self.server = None
    self.server_running = False
    self.listen_threads = []
    self.local_threads = []
    self.clients = []
    self.serverSocketLock = None
    self.cmdLock = None
    #here i have also declared some events for the command line input
    #and the receive function respectively, not sure if correct
    self.cmd_event = event
    self.socket_event = event

def openSocket(self):
    #binding server to port
    try: 
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind((self.host, self.port))
        self.server.listen(5)
        print "Listening to port " + str(self.port) + "..."
    except socket.error, (value,message):
        if self.server:
            self.server.close()
        print "Could not open socket: " + message
        sys.exit(1)

def run(self):
    self.openSocket()

    #making Rlocks for the socket and for the command line input

    self.serverSocketLock = threading.RLock()
    self.cmdLock = threading.RLock()

    #set blocking to non-blocking
    self.server.setblocking(0)
    #making two threads always running on the server,
    #one for the command line input, and one for broadcasting (sending)
    cmd_thread = threading.Thread(target=self.server_cmd)
    broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients])
    cmd_thread.daemon = True
    broadcast_thread.daemon = True
    #append the threads to thread list
    self.local_threads.append(cmd_thread)
    self.local_threads.append(broadcast_thread)

    cmd_thread.start()
    broadcast_thread.start()


    self.server_running = True
    while self.server_running:

        #connecting to "knocking" clients
        try:
            c = client.Client(self.server.accept())
            self.clients.append(c)
            print "Client " + str(c.address) + " connected"

            #making a thread for each clientn and appending it to client list
            listen_thread = threading.Thread(target=self.listenToClient,args=[c])
            self.listen_threads.append(listen_thread)
            listen_thread.daemon = True
            listen_thread.start()
            #setting event "client has connected"
            self.socket_event.set()

        except socket.error, (value, message):
            continue

    #close threads

    self.server.close()
    print "Closing client threads"
    for c in self.listen_threads:
        c.join()

def listenToClient(self, c):

    while self.server_running:

        #the idea here is to wait until the thread gets the message "client
        #has connected"
        self.socket_event.wait()
        #then clear the event immidiately...
        self.socket_event.clear()
        #and aquire the socket resource
        self.serverSocketLock.acquire()

        #the below is the receive thingy

        try:
            recvd_data = c.client.recv(self.size)
            if recvd_data == "" or recvd_data == "close\n":
                print "Client " + str(c.address) + (" disconnected...")
                self.socket_event.clear()
                self.serverSocketLock.release()
                return

            print recvd_data

            #I put these here to avoid locking the resource if no message 
            #has been received
            self.socket_event.clear()
            self.serverSocketLock.release()
        except socket.error, (value, message):
            continue            

def server_cmd(self):

    #this is a simple command line utility
    while self.server_running:

        #got to have a smart way to make this work

        self.cmd_event.wait()
        self.cmd_event.clear()
        self.cmdLock.acquire()


        cmd = sys.stdin.readline()
        if cmd == "":
            continue
        if cmd == "close\n":
            print "Server shutting down..."
            self.server_running = False

        self.cmdLock.release()


def broadcast(self, clients):
    while self.server_running:

        #this function will broadcast a message received from one
        #client, to all other clients, but i guess any thread
        #aspects applied to the above, will work here also

        try:
            send_data = sys.stdin.readline()
            if send_data == "":
                continue
            else:
                for c in clients:
                    c.client.send(send_data)
            self.serverSocketLock.release()
            self.cmdLock.release()
        except socket.error, (value, message):
            continue

if __name__ == "__main__":
e = threading.Event()
s = Server(e)
s.run()

А потом клиентский файл

import select
import socket
import sys
import server
import threading

class Client(threading.Thread):

#initializing client socket

def __init__(self,(client,address)):
    threading.Thread.__init__(self) 
    self.client = client 
    self.address = address
    self.size = 1024
    self.client_running = False
    self.running_threads = []
    self.ClientSocketLock = None

def run(self):

    #connect to server
    self.client.connect(('localhost',50000))

    #making a lock for the socket resource
    self.clientSocketLock = threading.Lock()
    self.client.setblocking(0)
    self.client_running = True

    #making two threads, one for receiving messages from server...
    listen = threading.Thread(target=self.listenToServer)

    #...and one for sending messages to server
    speak = threading.Thread(target=self.speakToServer)

    #not actually sure wat daemon means
    listen.daemon = True
    speak.daemon = True

    #appending the threads to the thread-list
    self.running_threads.append(listen)
    self.running_threads.append(speak)
    listen.start()
    speak.start()

    #this while-loop is just for avoiding the script terminating
    while self.client_running:
        dummy = 1

    #closing the threads if the client goes down
    print "Client operating on its own"
    self.client.close()

    #close threads
    for t in self.running_threads:
        t.join()
    return

#defining "listen"-function
def listenToServer(self):
    while self.client_running:

        #here i acquire the socket to this function, but i realize I also
        #should have a message passing wait()-function or something
        #somewhere
        self.clientSocketLock.acquire()

        try:
            data_recvd = self.client.recv(self.size)
            print data_recvd
        except socket.error, (value,message):
            continue

        #releasing the socket resource
        self.clientSocketLock.release()

#defining "speak"-function, doing much the same as for the above function       
def speakToServer(self):
    while self.client_running:
        self.clientSocketLock.acquire()
        try:
            send_data = sys.stdin.readline()
            if send_data == "close\n":
                print "Disconnecting..."
                self.client_running = False
            else:
                self.client.send(send_data)
        except socket.error, (value,message):
            continue

        self.clientSocketLock.release()

if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()

Я понимаю, что вам нужно прочитать несколько строк кода, но, как я уже сказал, я думаю, что концепция и сценарий сами по себе должны быть довольно просты для понимания. Было бы очень приятно, если бы кто-нибудь помог мне правильно синхронизировать мои потоки =)

Заранее спасибо

--- Редактировать ---

OK. Итак, теперь я упростил свой код до того, что он просто содержит функции отправки и получения как на сервере, так и на клиентских модулях. Клиенты, подключающиеся к серверу, получают свои собственные потоки, а функции отправки и получения в обоих модулях работают в своих собственных отдельных потоках. Это работает как чудо, с функцией широковещания в модуле сервера, отображающей строки, которые он получает от одного клиента всем клиентам. Пока все хорошо!

Следующее, что я хочу, чтобы мой скрипт выполнял, - это принятие определенных команд, то есть «close», в клиентском модуле, чтобы завершить работу клиента и присоединить все запущенные потоки в списке потоков. Я использую флаг события, чтобы уведомить listenToServer и основной поток, что поток speakToServer прочитал вход «close». Кажется, что основной поток выпрыгивает из цикла while и запускает цикл for, который должен присоединиться к другим потокам. Но тут висит. Кажется, что цикл while в потоке listenToServer никогда не останавливается, даже если server_running должен быть установлен в False, когда установлен флаг события.

Я публикую здесь только клиентский модуль, потому что я полагаю, что ответ для синхронизации этих двух потоков будет касаться синхронизации большего количества потоков как в клиентском, так и в серверном модуле.

import select
import socket
import sys
import server_bygg0203
import threading
from time import sleep

class Client(threading.Thread):

#initializing client socket

def __init__(self,(client,address)):

threading.Thread.__init__(self) 
self.client = client 
self.address = address
self.size = 1024
self.client_running = False
self.running_threads = []
self.ClientSocketLock = None
self.disconnected = threading.Event()

def run(self):

#connect to server
self.client.connect(('localhost',50000))

#self.client.setblocking(0)
self.client_running = True

#making two threads, one for receiving messages from server...
listen = threading.Thread(target=self.listenToServer)

#...and one for sending messages to server
speak = threading.Thread(target=self.speakToServer)

#not actually sure what daemon means
listen.daemon = True
speak.daemon = True

#appending the threads to the thread-list
self.running_threads.append((listen,"listen"))
self.running_threads.append((speak, "speak"))
listen.start()
speak.start()

while self.client_running:

    #check if event is set, and if it is
    #set while statement to false

    if self.disconnected.isSet():
        self.client_running = False 

#closing the threads if the client goes down
print "Client operating on its own"
self.client.shutdown(1)
self.client.close()

#close threads

#the script hangs at the for-loop below, and
#refuses to close the listen-thread (and possibly
#also the speak thread, but it never gets that far)

for t in self.running_threads:
    print "Waiting for " + t[1] + " to close..."
    t[0].join()
self.disconnected.clear()
return

#defining "speak"-function      
def speakToServer(self):

#sends strings to server
while self.client_running:
    try:
        send_data = sys.stdin.readline()
        self.client.send(send_data)

        #I want the "close" command
        #to set an event flag, which is being read by all other threads,
        #and, at the same time set the while statement to false

        if send_data == "close\n":
            print "Disconnecting..."
            self.disconnected.set()
            self.client_running = False
    except socket.error, (value,message):
        continue
return

#defining "listen"-function
def listenToServer(self):

#receives strings from server
while self.client_running:

    #check if event is set, and if it is
    #set while statement to false

    if self.disconnected.isSet():
        self.client_running = False
    try:
        data_recvd = self.client.recv(self.size)
        print data_recvd
    except socket.error, (value,message):
        continue
return

if __name__ == "__main__":
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost'))
c.run()

Позже, когда я получу эту серверную / клиентскую систему, я буду использовать эту систему на некоторых моделях лифтов, которые у нас есть здесь в лаборатории, причем каждый клиент получает заказы на этаж или звонки «вверх» и «вниз». Сервер будет запускать алгоритм распределения и обновлять очереди лифта на клиентах, которые наиболее подходят для запрошенного заказа. Я понимаю, что это долгий путь, но я думаю, что нужно сделать один шаг за раз =)

Надеюсь, у кого-то есть время разобраться в этом. Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 02 марта 2012

Сделайте резервную копию своей работы, затем бросьте ее - частично.

Вы должны реализовать свою программу по частям и проверять каждую часть по ходу. Во-первых, займитесь частью input вашей программы. Не беспокойтесь о том, как транслировать полученные вами данные. Вместо этого переживайте, что вы можете успешно и многократно получать входные данные через сокет. Пока - так хорошо.

Теперь я предполагаю, что вы хотели бы отреагировать на этот вход, передав широковещание другим подключенным клиентам. Ну, очень плохо, ты пока не можешь этого сделать! Потому что я оставил одну незначительную деталь из абзаца выше. Вы должны разработать ПРОТОКОЛ .

Что такое протокол? Это набор правил для общения. Как ваш сервер узнает, когда клиент завершил отправку своих данных? Это заканчивается каким-то специальным символом? Или, возможно, вы кодируете размер сообщения, которое будет отправлено, как первый или два байта сообщения.

Оказывается, это большая работа, не так ли? : -)

Какой простой протокол. Протокол line-oriented прост. Читайте 1 символ за раз, пока не дойдете до конца терминатора записи - '\ n'. Таким образом, клиенты будут отправлять такие записи на ваш сервер -

HELO \ п MSG Дэйв, где ваши дети? \ N

Итак, если вы разработали этот простой протокол, реализуйте его. На данный момент, НЕ БЕСПОКОЙТЕСЬ О МНОГОЗВУКОВОЙ ПЕРСОНАЛЕ! Просто беспокойтесь о том, чтобы это заработало.

Ваш текущий протокол должен прочитать 1024 байта. Что может быть неплохо, просто убедитесь, что вы отправляете 1024-байтовые сообщения от клиента.

Как только вы настроите протокол, перейдите к реакции на вход. Но сейчас вам нужно что-то, что будет читать ввод. Как только это будет сделано, мы сможем что-то сделать с этим.

jdi прав, у вас слишком много программ для работы. Кусочки легче исправить.

1 голос
/ 01 марта 2012

Самая большая проблема, с которой я сталкиваюсь в этом коде, заключается в том, что у вас слишком много всего происходит, чтобы легко отладить вашу проблему. Потоки могут быть чрезвычайно сложными из-за нелинейности логики. Особенно, когда вам нужно беспокоиться о синхронизации с блокировками.

Причина, по которой вы видите, как клиенты блокируют друг друга, заключается в том, как вы используете serverSocketLock в цикле listenToClient () на сервере. Если честно, сейчас это не совсем ваша проблема с вашим кодом, но она стала проблемой, когда я начал отлаживать ее и превратил сокеты в блокирующие сокеты. Если вы помещаете каждое соединение в отдельный поток и читаете из них, то нет никакой причины использовать здесь глобальную блокировку сервера. Все они могут читать из своих собственных сокетов одновременно, что является целью потока.

Вот моя рекомендация для вас:

  1. Избавьтесь от всех лишних блокировок и лишних нитей и начните с самого начала
  2. Пусть клиенты подключатся, как вы, и поместите их в свою ветку, как вы. И просто пусть они отправляют данные каждую секунду. Убедитесь, что вы можете подключить и отправить более одного клиента, а ваш сервер зацикливается и получает. Как только эта часть заработает, вы можете перейти к следующей части.
  3. Прямо сейчас у вас установлены неблокируемые сокеты. Это заставляет их все очень быстро вращаться по своим циклам, когда данные не готовы. Поскольку вы работаете с потоками, вы должны установить их для блокировки. Тогда читатель темы просто будет сидеть и ждать данных и сразу же отвечать.

Блокировки используются, когда потоки будут получать доступ к общим ресурсам. Очевидно, что в любое время поток должен попытаться изменить атрибут сервера, например список или значение. Но не тогда, когда они работают над собственными сокетами.

Событие, которое вы используете для запуска своих читателей, здесь не кажется необходимым. Вы получили клиента, и вы запускаете поток позже. Итак, он готов к работе.

В двух словах ... упростите и протестируйте по одному. Когда это работает, добавьте больше. Сейчас слишком много потоков и блокировок.

Вот упрощенный пример вашего listenToClient метода:

def listenToClient(self, c):
    while self.server_running:
        try:
            recvd_data = c.client.recv(self.size)
            print "received:", c, recvd_data
            if recvd_data == "" or recvd_data == "close\n":
                print "Client " + str(c.address) + (" disconnected...")
                return

            print recvd_data

        except socket.error, (value, message):
            if value == 35:
                continue 
            else:
                print "Error:", value, message  
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...