Многопоточные сокеты с центральным релейным сервером - PullRequest
0 голосов
/ 27 декабря 2018

Ранее мне удавалось реализовать скрипт сокета клиент-сервер, который передает сообщения между одним клиентом и сервером, и сейчас я пытаюсь реализовать систему с несколькими клиентами.

Более конкретно, я бынравится использовать сервер как своего рода посредник между двумя клиентами, который получает информацию от одного клиента и передает ее другому.Я попытался присоединить и отправить номер порта принимающего клиента, а затем извлечь его из сообщения на стороне сервера.После этого я попытался бы отправить его в любой сокет с таким номером порта, но у меня возникли некоторые проблемы (поскольку номера портов определяются на момент отправки, я полагаю?), Так что теперь я просто пытаюсь ретранслировать отправленное сообщение обратно.всем клиентам.Однако проблема заключается в том, что сообщение отправляется только на сервер и не передается нужному клиенту.

Ранее я пытался внедрить одноранговую систему, но столкнулся с проблемой, поэтомурешил сделать шаг назад и сделать это вместо этого.

Server.py:

import socket, _thread, threading
import tkinter as tk

SERVERPORT = 8600
HOST = 'localhost'

class Server():
    def __init__(self):
        self.Connected = True
        self.ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
        self.ServerSocket.bind((HOST, SERVERPORT))
        self.ServerSocket.listen(2)
        self.Clients = []

    def Listen(self):
        print('Server is now running')
        while self.Connected:
            ClientSocket, Address = self.ServerSocket.accept()
            self.Clients.append(Address)
            print('\nNew user connected', Address)
            t = threading.Thread(target=self.NewClient, args=(ClientSocket,
                                                              Address))
            t.daemon = True
            t.start()
        self.Socket.close()

    def NewClient(self, ClientSocket, Address):
        while self.Connected:
            if ClientSocket:
                try:
                    ReceivedMsg = ClientSocket.recv(4096)
                    print('Message received from', Address, ':', ReceivedMsg)
                    self.Acknowledge(ClientSocket, Address)
                    if ReceivedMsg.decode('utf8').split()[-1] != 'message':
                        ReceiverPort = self.GetSendPort(ReceivedMsg)
                        self.SendToClient(ClientSocket,ReceivedMsg,ReceiverPort)
                except:
                    print('Connection closed')
                    raise Exception
        ClientSocket.close()

    def Acknowledge(self, Socket, Address):
        Socket.sendto(b'The server received your message', Address)

    def GetSendPort(self, Msg):
        MsgDigest = Msg.decode('utf8').split()
        return int(MsgDigest[-1])

    def SendToClient(self, Socket, Msg, Port):
        Addr = (HOST, Msg) 
        for Client in self.Clients:
            Socket.sendto(Msg, Client)

def NewThread(Func, *args):
    if len(args) == 1:
        t = threading.Thread(target=Func, args=(args,))
    elif len(args) > 1:
        t = threading.Thread(target=Func, args=args)
    else:
        t = threading.Thread(target=Func)
    t.daemon = True
    t.start()
    t.join()

Host = Server()
NewThread(Host.Listen)

И клиент (.py):

import socket, threading
import tkinter as tk

Username = 'Ernest'
PORT = 8601
OtherPORT = 8602
SERVERPORT = 8600
HOST = '127.0.0.1'

class Client():
    def __init__(self, Username):
        self.Connected, self.Username = False, Username
        self.Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def Connect(self):
        print('Trying to connect')
        try:
            self.Socket.connect((HOST, SERVERPORT))
            self.Connected = True
            print(self.Username, 'connected to server')            
            Msg = MsgUI(self.Username)
            Msg.Display()            
        except Exception:
            print('Could not connect to server')
            raise Exception

    def SendMsg(self):
        if self.Connected:
            Msg = '{} sent you a message {}'.format(self.Username, OtherPORT)
            self.Socket.sendall(bytes(Msg, encoding='utf8'))
            self.GetResponse()

    def GetResponse(self, *args):
        AckMsg = '\n{} received the message'.format(self.Username)
        NMsg = '\n{} did not receive the message'.format(self.Username)
        if self.Connected:
            Msg = self.Socket.recv(4096)
            print(Msg)
            if Msg:
                self.Socket.sendall(bytes(AckMsg, encoding='utf8'))
            else:
                self.Socket.sendall(bytes(NMsg, encoding='utf8'))

class MsgUI():
    def __init__(self, Username):
        self.Username = Username
        self.entry = tk.Entry(win)
        self.sendbtn = tk.Button(win, text='send', command=Peer.SendMsg)

    def Display(self):
        self.entry.grid()
        self.sendbtn.grid()
        win.mainloop()

win = tk.Tk()
Peer = Client(Username)
Peer.Connect()

Я хочусообщение отправляется всякий раз, когда пользователь нажимает кнопку отправки в окне tkinter, но в то же время он постоянно «прослушивает», чтобы увидеть, получено ли оно каких-либо сообщений.

Ранее я также пытался запустить метод GetResponse в клиенте в другом потоке, и вместо if self.Connected я использовал while self.Connected, но он все еще не работал.

ОБНОВЛЕНИЕ

После некоторых полезных комментариев я отредактировал два файла следующим образом: Сервер теперь содержит два сокета для каждого клиента, который запускается первым.Файл сервера импортируется в файл клиента как модуль.Затем запускается каждый файл клиента, и каждый клиент запускает функцию в файле сервера, запрашивая использование сокета.Если запрос разрешен (т.е. не было выдано никакой ошибки), сокет подключается, добавляется в набор клиентов, сохраненных в файле сервера, а затем возвращается в файл клиента.Затем клиент использует этот сокет для отправки и получения сообщений.

Server.py

import socket, _thread, threading
import tkinter as tk

SERVERPORT = 8600
HOST = 'localhost'

class Server():
    def __init__(self):
        self.Connected = True
        self.ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
        self.ServerSocket.bind((HOST, SERVERPORT))
        self.ServerSocket.listen(2)
        self.Clients = {}

    def ConnectClient(self, Username, Port):
        Socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.Clients[Username] = [Socket, Port, False]
        try:
            self.Clients[Username][0].connect((HOST, SERVERPORT))
            self.Clients[Username][2] = True
            print('Opened port for user', Username)
            return Socket
        except Exception:
            print('Could not open port for user', Username)
            raise Exception

    def Listen(self):
        print('Server is now running')
        while self.Connected:
            ClientSocket, Address = self.ServerSocket.accept()
            print('\nNew user connected', Address)
            t = threading.Thread(target=self.NewClient, args=(ClientSocket,
                                                              Address))
            t.daemon = True
            t.start()
        self.Socket.close()

    def NewClient(self, ClientSocket, Address):
        while self.Connected:
            if ClientSocket:
                try:
                    ReceivedMsg = ClientSocket.recv(4096)
                    if b'attempting to connect to the server' in ReceivedMsg:
                        ClientSocket.send(b'You are now connected to the server')
                    else:
                        print('Message received from', Address, ':',ReceivedMsg)
                        #self.Acknowledge(ClientSocket, Address)
                        ReceiverPort = self.GetSendPort(ReceivedMsg)
                        if ReceiverPort != None:
                            self.SendToClient(ClientSocket,ReceivedMsg,
                                              ReceiverPort)
                except:
                    print('Connection closed')
                    raise Exception
        ClientSocket.close()

    def Acknowledge(self, Socket, Address):
        Socket.sendto(b'The server received your message', Address)

    def GetSendPort(self, Msg):
        MsgDigest = Msg.decode('utf8').split()
        try:
            Port = int(MsgDigest[-1])
        except ValueError:
            Port = None
        return Port

    def SendToClient(self, Socket, Msg, Port):
        Addr = (HOST, Port)
        Receiver = None
        for Client, Vars in self.Clients.items():
            if Vars[1] == Port:
                Receiver = Client
        self.Clients[Receiver][0].sendto(Msg, Addr)

def NewThread(Func, *args):
    if len(args) == 1:
        t = threading.Thread(target=Func, args=(args,))
    elif len(args) > 1:
        t = threading.Thread(target=Func, args=args)
    else:
        t = threading.Thread(target=Func)
    t.daemon = True
    t.start()
    t.join()

Host = Server()
if __name__ == '__main__':
    NewThread(Host.Listen)

И Client.py

import socket, threading, Server
import tkinter as tk

Username = 'Ernest'
PORT = 8601
OtherPORT = 8602
SERVERPORT = 8600
HOST = '127.0.0.1'

class Client():
    def __init__(self, Username):
        self.Connected, self.Username = False, Username

    def Connect(self):
        print('Requesting to connect to server')
        try:
            self.Socket = Server.Host.ConnectClient(self.Username, PORT)
            self.Connected = Server.Host.Clients[self.Username][2]
            Msg = '{} is attempting to connect to the server'.format(self.Username)
            self.Socket.sendall(bytes(Msg, encoding='utf8'))            
            ReceivedMsg = self.Socket.recv(4096)
            print(ReceivedMsg)
            Msg = MsgUI(self.Username)
            Msg.Display()            
        except Exception:
            print('Could not connect to server')
            raise Exception

    def SendMsg(self):
        try:
            if self.Connected:
                Msg = '{} sent you a message {}'.format(self.Username,OtherPORT)
                self.Socket.sendall(bytes(Msg, encoding='utf8'))
                self.GetResponse()
        except Exception:
            print('Connection closed')
            raise Exception

    def GetResponse(self, *args):
        AckMsg = '\n{} received the message'.format(self.Username)
        NMsg = '\n{} did not receive the message'.format(self.Username)
        if self.Connected:
            Msg = self.Socket.recv(4096)
            print(Msg)
            if Msg:
                self.Socket.sendall(bytes(AckMsg, encoding='utf8'))
            else:
                self.Socket.sendall(bytes(NMsg, encoding='utf8'))

class MsgUI():
    def __init__(self, Username):
        self.Username = Username
        self.entry = tk.Entry(win)
        self.sendbtn = tk.Button(win, text='send', command=Peer.SendMsg)

    def Display(self):
        self.entry.grid()
        self.sendbtn.grid()
        win.mainloop()

win = tk.Tk()
Peer = Client(Username)
Peer.Connect()

Теперь проблема скорее заключается впитон и проблема объема.При попытке передать сообщение обратно клиенту я получил KeyError, так как словарь Clients все еще был пуст.При выполнении вызова функции на сервере в файле клиента становится ясно, что обновление словаря происходит в файле клиента, а не в файле сервера, который находится в другом экземпляре.Мне нужен метод изменения содержимого словаря Clients, который вызывается для действия клиентским файлом, но вступает в силу в файле сервера.

1 Ответ

0 голосов
/ 28 декабря 2018

Вы привержены многопоточности?Потоки не запускаются одновременно в python (из-за GIL), и хотя они являются одним из способов обработки параллельных операций, они не являются единственным способом и, как правило, не лучшим способом, если только они не являются единственным способом,Рассмотрим этот код, который плохо обрабатывает случаи сбоев, но, кажется, работает как отправная точка.

import socket, select, Queue

svrsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
svrsock.setblocking(0)
svrsock.bind(('', 17654))
svrsock.listen(16)
client_queues = {}
write_ready=[] # we'll update this for clients only that have things in the queue
while client_queues.keys() + [svrsock] :
  readable, writable, exceptional = select.select(client_queues.keys() + [svrsock] , write_ready, [])
  for rd in readable:
    if rd is svrsock: # reading listening socket == accepting connection
      conn, addr = svrsock.accept()
      print("Connection from {}".format(addr))
      conn.setblocking(0)
      client_queues[conn] = Queue.Queue()
    else:
      data = rd.recv(1024)
      if data:
        # TODO: send to all queues
        print("Message from {}".format(rd.getpeername()))
        for sock, q in client_queues.iteritems(): 
          q.put("From {}: {}".format( rd.getpeername(), data))
          if sock not in write_ready:
            write_ready.append(sock)
  for rw in writable:
    try:
      data = client_queues[rw].get_nowait()
      rw.send(data)
    except Queue.Empty:
      write_ready.remove(rw)
      continue

Концепция довольно проста.Сервер принимает соединения;Каждое соединение (сокет) связано с очередью ожидающих сообщений.Каждый сокет, который готов для чтения, читается из, и его сообщение добавляется в очередь каждого клиента.Клиент-получатель добавляется в список write_ready клиентов с ожидающими данными, если его там еще нет.Затем каждому сокету, который готов к записи, записывается следующее сообщение в очереди.Если сообщений больше нет, получатель удаляется из списка write_ready.

Это очень легко организовать, если вы не используете многопоточность, потому что вся координация присуща порядку приложения.С потоками было бы сложнее и намного больше кода, но, вероятно, не больше производительности из-за gil.

Секрет одновременной обработки нескольких потоков ввода / вывода без многопоточности select.В принципе это довольно просто;мы передаем select() список возможных сокетов для чтения, другой список возможных сокетов для записи и окончательный список, который для этой упрощенной демонстрации я полностью игнорирую.Результаты вызова select будут включать один или несколько сокетов, которые на самом деле готовы к чтению или записи, что позволяет мне блокировать, пока один или несколько сокетов не будут готовы к работе.Затем я обрабатываю все сокеты, готовые к работе каждый проход (но они уже отфильтрованы до тех, которые не будут блокироваться).

Здесь еще предстоит сделать тонну.Я не убираю за собой, не отслеживаю закрытые соединения, не обрабатываю никаких исключений и так далее.но не беспокоясь о многопоточности и гарантиях параллелизма, довольно легко начать устранять эти недостатки.

Вот оно "в действии".Здесь для клиентской стороны я использую netcat, который идеально подходит для тестирования уровня 3 без протоколов уровня 4+ (другими словами, так называемый raw tcp).Он просто открывает сокет для данного пункта назначения и порта и отправляет свой stdin через сокет и отправляет данные своего сокета в stdout, что делает его идеальным для демонстрации этого серверного приложения!

Я также хотел бы отметить, что связывание кода между сервером и клиентом нецелесообразно, поскольку вы не сможете развернуть изменения в одном из них, не нарушив другого.Идеально иметь «контракт» между сервером и клиентом и поддерживать его.Даже если вы реализуете поведение сервера и клиента в одной и той же базе кода, вы должны использовать коммуникационный контракт tcp для управления своей реализацией, а не для совместного использования кода.Просто мои 2 цента, но как только вы начинаете делиться кодом, вы часто начинаете связывать версии сервера / клиента так, как вы этого не ожидали.

сервер:

$ python ./svr.py
Connection from ('127.0.0.1', 52059)
Connection from ('127.0.0.1', 52061)
Message from ('127.0.0.1', 52061)
Message from ('127.0.0.1', 52059)
Message from ('127.0.0.1', 52059)

Первый клиент (52059):

$ nc localhost 17654
hello
From ('127.0.0.1', 52061): hello
From ('127.0.0.1', 52059): hello
From ('127.0.0.1', 52059): hello

Второй клиент:

$ nc localhost 17654
From ('127.0.0.1', 52061): hello
hello
From ('127.0.0.1', 52059): hello
hello
From ('127.0.0.1', 52059): hello

Если вам нужно более убедительнопочему select является более убедительным, чем одновременное выполнение, рассмотрим это: Apache основан на модели потоков, другими словами, каждое соединение получает рабочий поток.nginx основан на модели select, поэтому вы можете видеть, насколько быстрее это может быть.Нельзя сказать, что nginx по своей природе лучше, поскольку Apache извлекает выгоду из модели многопоточности из-за интенсивного использования модулей для расширения возможностей (например, mod_php), тогда как nginx не имеет этого ограничения и может обрабатывать все запросы из любого потока.Но исходная производительность nginx обычно считается намного выше и гораздо более эффективной, и главная причина этого заключается в том, что он избегает почти всех переключений контекста процессора, присущих apache.Это правильный подход!

Слово о масштабировании.Очевидно, это не будет масштабироваться вечно.Также не была бы многопоточная модель;в конце концов у вас закончились темы.Более распределенная и высокопроизводительная система, вероятно, будет использовать какой-либо механизм Pub / Sub, который будет выгружать отслеживание клиентских соединений и очереди сообщений с сервера на уровень данных pub / sub и позволит восстанавливать соединения и помещать в очередь данные для отправки,а также добавление нескольких серверов за балансировщиком нагрузки.Просто выбрасываю это туда.Вы можете быть приятно удивлены тем, насколько хорошо select может масштабироваться (процессор в любом случае намного быстрее, чем сеть, что, вероятно, не является узким местом).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...