Всегда ли доступен для записи неблочный сокет после завершения соединения в python? - PullRequest
0 голосов
/ 28 мая 2020

Я хочу создать инструмент карты портов на python с нуля. По сути, это TCP-прокси, который передает весь трафик c между клиентом и целевой службой или приложением.

В частности, для каждого соединения я создаю два сокета, которые отвечают за связь с клиентом и целевой службой соответственно. Чтобы реализовать функцию мультиплексирования ввода-вывода, я использую модуль selectors для отслеживания событий EVENT_READ и EVENT_WRITE на этих двух сокетах. Однако я обнаружил, что сокеты всегда доступны для записи, поэтому основной l oop вообще не блокируется. Это нормально? Мой код выглядит следующим образом:

import socket
import selectors

def recv_from(sock):
    data = b''
    try:
        while True:
            chunk = sock.recv(4096)
            if not chunk:
                break
            data += chunk
    except:
        pass
    return data

class RelayHandlder:

    def __init__(self, client_sock, remote_sock, selector):
        '''
        client_sock and remote_sock have already finished the connection.
        '''
        self._client_sock = client_sock
        self._remote_sock = remote_sock
        self._selector = selector

        self._send_buffer = b''
        self._recv_buffer = b''

        self._selector.register(self._client_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._client_handler)
        self._selector.register(self._remote_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._remote_handler)

    def _client_handler(self, client_sock, mask):
        if mask & selectors.EVENT_READ:
            data = recv_from(client_sock)
            if data:                
                self._send_buffer = data
            else:
                self._close()
        elif mask & selectors.EVENT_WRITE:
            if self._recv_buffer:
                try:
                    client_sock.send(self._recv_buffer)
                    self._recv_buffer = b''
                except OSError:
                    self._close()

    def _remote_handler(self, remote_sock, mask):
        if mask & selectors.EVENT_READ:
            data = recv_from(remote_sock)
            if data:
                self._recv_buffer = data
            else:
                self._close()
        elif mask & selectors.EVENT_WRITE:
            if self._send_buffer:
                try:
                    remote_sock.send(self._send_buffer)
                    self._send_buffer = b''
                except OSError:
                    self._close()

    def _close(self):
        print('Closing ...')
        self._selector.unregister(self._client_sock)
        self._client_sock.close()
        self._selector.unregister(self._remote_sock)
        self._remote_sock.close()
        self._send_buffer = b''
        self._recv_buffer = b''

class PortMapper:
    '''
    Map the remote port to local.
    '''

    def __init__(self, proxy_ip, proxy_port, remote_ip, remote_port):
        self.proxy_ip = proxy_ip
        self.proxy_port = proxy_port
        self.remote_ip = remote_ip
        self.remote_port = remote_port

        self._selector = selectors.DefaultSelector()

        self._proxy_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        while True:
            try:
                self._proxy_sock.bind((proxy_ip, proxy_port))
                break
            except OSError:
                proxy_port += 1
                self.proxy_port = proxy_port
        self._proxy_sock.listen(10)
        self._proxy_sock.setblocking(False)
        self._selector.register(self._proxy_sock, selectors.EVENT_READ, self._accept_handler)
        print('Listening at {}:{}'.format(proxy_ip, proxy_port))

    def _accept_handler(self, proxy_sock, mask):
        client_sock, addr = proxy_sock.accept()
        client_sock.setblocking(False)
        print('Accept from {}'.format(addr))

        remote_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        remote_sock.setblocking(False)
        try:
            remote_sock.connect((self.remote_ip, self.remote_port))
        except BlockingIOError:
            pass

        RelayHandlder(client_sock, remote_sock, self._selector)

    def loop(self):
        while True:
            events = self._selector.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Port Map Tool.')
    parser.add_argument('-H', '--remote-host', required=True, type=str, help='Remote host.')
    parser.add_argument('-P', '--remote-port', required=True, type=int, help='Remote port.')
    parser.add_argument('-p', '--local-port', default=1000, type=int, help='Local port.')
    args = parser.parse_args()

    PortMapper('0.0.0.0', args.local_port, args.remote_host, args.remote_port).loop()

В коде self._send_buffer в RelayHandler используется для кеширования данных, полученных от клиента. Если self._remote_sock доступен для записи, а self._send_buffer не пуст, прокси отправит self._send_buffer удаленной службе. Logi c аналогичен self._client_sock. Основной l oop определен в функции loop в PortMapper.

У меня два вопроса:

  • Является ли неблочный сокет доступным для записи после завершения подключение в python?
  • В приведенном выше коде сокеты всегда доступны для записи, а буферы часто пусты. Таким образом, для каждого отдельного l oop в основном l oop, self._selector.select() всегда будет возвращаться без блокировки, а обратный вызов выполняется, чтобы ничего не делать, а только для того, чтобы увидеть, пуст ли буфер, что может повредить производительности. Есть ли лучший метод или структура для этого?

1 Ответ

1 голос
/ 28 мая 2020

Сокет обычно доступен для записи до заполнения системного буфера. Это причина того, почему многие простые select мультиплексированные системы рассматривают только часть чтения и предполагают, что они смогут записать или принять возможность блокировки, если это не так.

Если вы хотите быть супер безопасно и убедитесь, что вы сможете писать, вам следует игнорировать EVENT_WRITE, если вы не готовы что-то написать. Но чтобы ваш код не исчерпал локальный буфер памяти, реле должно прекратить чтение (также игнорировать EVENT_READ), если другой канал не может писать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...