Как использовать неблокирующие розетки - PullRequest
0 голосов
/ 02 октября 2019

Я пытаюсь написать неблокирующие серверные / клиентские скрипты.

Во-первых, вот мой код:

Server.py ->

import socket
import select
import threading

class ChatServer(threading.Thread):
    """
    SERVER THREAD
    """

    MAX_WAITING_CONNECTION = 10
    RECV_HEADER_LENGTH = 10

    def __init__(self, host, port):
        """
        Initialize new ChatServer

        :param host: Binding Host IP
        :param port: Binding Port number
        """


        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.connections = [] ## Will keep active client connections.
        self.clients = {}
        self.running = True

    def _bind_socket(self):
        """
        Creates the server socket and binds it to the given host and port
        """
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(self.MAX_WAITING_CONNECTION)
        self.connections.append(self.server_socket)

    def _send(self, sock, client_message):
        """
        Prefixes each message with a 4-byte length before sending.

        :param sock: the incoming sock
        :param msg: the massage to send
        """
        user = self.clients[sock]
        client_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
        sock.send(client_message)

    def _receive(self, sock):
        try:
            ## Bytes type header
            message_header = sock.recv(self.RECV_HEADER_LENGTH)

            if not len(message_header):
                return False

            message_length = int(message_header.decode('utf-8').strip())
            ## Bytes type data
            return {"header": message_header, "data": sock.recv(message_length)}
        except Exception as e:
            print('exception occur')
            print(e)
            return False

    def _broadcast(self, sending_client_socket, client_message):
        """
        Breadcasts a message to all the clients different from both the server itself and
        the client sending the message.
        :param client_socket: the socket of the client sending the message
        :param client_message: the message to broadcast ({'header': <bytes header>, 'data': <bytes data message>})
        """

        for sock in self.clients:
            is_not_the_server = sock != self.server_socket
            is_not_the_client_sending = sock != sending_client_socket ## sending client socket

            if is_not_the_server and is_not_the_client_sending:
                try:
                    user = self.clients[sending_client_socket]
                    print(f"Type client_message: {type(client_message)}")
                    print(f"Type user: {type(user)}")
                    sending_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
                    sock.send(sending_message)
                except socket.error:
                    ## handles a possible disconnection of client "sock" by ..
                    sock.close()
                    self.connections.remove(sock) ## removing sock form active connections.
                    del self.clients[sock]

    def _log(self, sock, message):
        user = self.clients[sock]
        print(f"Received message from {user['data'].decode('utf-8')}: {message['data'].decode('utf-8')}")

    def _run(self):
        """
        Actually runs the server.
        """
        while self.running:
            ## Get the list of sockets which are ready to be read through select non-blocking calls
            ## The select has a timeout of 60 seconds

            try:
                ready_to_read, ready_to_write, in_error = select.select(self.connections, [], self.connections)
            except socket.error as e:
                print(f"General Error: {e}")
                continue
            else:
                for sock in ready_to_read:
                    ## if socket is server socket.
                    if sock == self.server_socket:
                        try:
                            client_socket, client_address = self.server_socket.accept()
                        except socket.error as e:
                            print(f"General Error: {e}")
                            break
                        else:
                            user = self._receive(client_socket)
                            if user is False:
                                continue
                            self.connections.append(client_socket)
                            self.clients[client_socket] = user
                            print(f"Accepted new connection from {client_address[0]}:{client_address[1]}..")
                    else:
                        message = self._receive(sock) ## Get client message
                        if message is False:
                            print(f"Closed connection from {self.clients[sock]['data'].decode('utf-8')}")
                            self.connections.remove(sock)
                            del self.clients[sock]
                            continue

                        self._log(sock, message)
                        print(message)
                        self._broadcast(sock, message)

                for sock in in_error:
                    self.connections.remove(sock)
                    del self.clients[sock]
        self.stop()

    def run(self):
        """
        Given a host and a port, binds the socket and runs the server.
        """
        self._bind_socket()
        self._run()

    def stop():
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.server_socket.close()


if __name__ == '__main__':

    _HOST = '127.0.0.1'
    _PORT = 6667

    chat_server = ChatServer(_HOST, _PORT)
    chat_server.start()
    chat_server.join()

Имой client.py ->

import socket
import select
import errno
import threading
import sys

RECV_HEADER_LENGTH = 10

class ChatClient(threading.Thread):


    def __init__(self, host, port):
        """
        Initialize new ChatClient

        :param host: Connect Host IP
        :param port: Connect Port number
        """

        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.username = input("Username: ")
        self.running = True

    def _send(self, sock, message):
        sock.send(message.encode('utf-8'))  

    def _connect(self):
        """
        Connecting to the ChatServer
        """
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        self.client_socket.setblocking(0)
        self.username_header = f"{len(self.username):<{RECV_HEADER_LENGTH}}"
        self._send(self.client_socket, self.username_header+self.username)

    def prompt(self) :
        sys.stdout.write(f"#{self.username}$ ")
        sys.stdout.flush()

    def _run(self):
        """
        Actually run client.
        """
        while self.running:
            reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket], [self.client_socket], [])
            for sock in reading_sockets:
                if sock == self.client_socket:
                    username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    if not len(username_header):
                        print("Connection closed by the server.")
                        sys.exit()
                    username_length = int(username_header.decode("utf-8").strip())
                    username = self.client_socket.recv(username_length).decode("utf-8")
                    message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    message_length = int(message_header.decode('utf-8').strip())
                    message = self.client_socket.recv(message_length).decode('utf-8')
                    print(f"#{username}$ {message}")
            for sock in writing_sockets:
                self.prompt()
                message = input()
                print(len(message))
                if not message:
                    continue
                message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
                self._send(sock, message_header+message)
        self.stop()

    def run(self):
        """
        Given a host and a port, binds the socket and runs the server.
        """
        self._connect()
        self._run()

    def stop():
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.client_socket.close()


if __name__ == '__main__':

    _HOST = '127.0.0.1'
    _PORT = 6667

    chat_server = ChatClient(_HOST, _PORT)
    chat_server.start()
    chat_server.join()

Теперь моя проблема на client.py, я думаю. В функции _run я использую select reading_socket и writing_socket для одного и того же сокета.

Когда я запускаю этот код, блокировка цикла для чтения_зекета. Потому что в цикле for для write_sockets держите мою оболочку и никогда не выпускайте, даже приходит другой массаж. Поэтому я хочу дождаться ввода пользователя, но в то же время читать другие сообщения и печатать на оболочке. Я использую python3.7. Как мне этого добиться?

1 Ответ

0 голосов
/ 02 октября 2019

Итак, я хочу подождать ввода пользователя, но в то же время читать другие сообщения и печатать на оболочке. Я использую python3.7. Как я могу добиться этого?

Удостоверьтесь, что читаете только из sys.stdin, когда sys.stdin действительно имеет пользовательский ввод, готовый дать вам;таким образом, ваш input() звонок не будет блокироваться. Вы можете сделать это, передав sys.stdin в качестве одного из сокетов в вашем первом аргументе select(). (Примечание: это не будет работать под Windows, потому что Microsoft по своему усмотрению решила, что их реализация select() не будет поддерживать selection-on- stdin. В Windows вам придется использовать отдельный поток для блокировки чтениявместо stdin, вместе с каким-либо обменом сообщениями между потоками, чтобы получить данные, прочитанные из stdin, обратно в сетевой поток, и очень трудно начать работать)

Вот как я изменилВаш _run(self) метод для получения желаемого поведения (проверено в MacOS / X):

def _run(self):
    """
    Actually run client.
    """
    while self.running:
        reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket, sys.stdin], [], [])
        for sock in reading_sockets:
            if sock == self.client_socket:
                username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                if not len(username_header):
                    print("Connection closed by the server.")
                    sys.exit()
                username_length = int(username_header.decode("utf-8").strip())
                username = self.client_socket.recv(username_length).decode("utf-8")
                message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                message_length = int(message_header.decode('utf-8').strip())
                message = self.client_socket.recv(message_length).decode('utf-8')
                print(f"#{username}$ {message}")
            elif sock == sys.stdin:
               self.prompt()
               message = input()
               print(len(message))
               if not message:
                   continue
               message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
               self._send(self.client_socket, message_header+message)
    self.stop()

Обратите внимание, что я добавил sys.stdin в аргумент read-sockets вызова select() (так, чтобыselect() вернется, когда их данные будут готовы для чтения из stdin), и что я также удалил self.client_socket из аргумента write-sockets (потому что его размещение приведет к тому, что select() вернется, как только self.client_socket имеет буферное пространство для приема большего количества исходящих данных, то есть оно будет возвращаться немедленно почти все время, что приведет к вращению вашего цикла обработки событий и заставит вашу клиентскую программу использовать почти 100% отcore (это не то, что вам нужно).

Я также изменил ваш код read-from-stdin так, чтобы он вызывался только тогда, когда читаемый сокет равен sys.stdin, так как нет смысла пытаться читать из stdin, если он не имеетданные, чтобы дать вам;и, наконец, у меня есть ваш self._send() вызов отправки на сокете TCP, вместо того, чтобы пытаться отправить байты обратно в stdin (потому что stdin предназначен только для чтения / ввода, поэтому отправка байтов в него не имеет никакого смысла). *

...