Я хочу создать инструмент карты портов на python с нуля. По сути, это TCP-прокси, который передает весь трафик c между клиентом и целевой службой или приложением.
В частности, для каждого соединения я создаю два сокета, которые отвечают за связь с клиентом и целевой службой соответственно. Чтобы реализовать функцию мультиплексирования ввода-вывода, я использую модуль selectors
для отслеживания событий EVENT_READ
и EVENT_WRITE
на этих двух сокетах. Однако я обнаружил, что сокеты всегда доступны для записи, поэтому основной l oop вообще не блокируется. Это нормально? Мой код выглядит следующим образом:
import socket
import selectors
def recv_from(sock):
data = b''
try:
while True:
chunk = sock.recv(4096)
if not chunk:
break
data += chunk
except:
pass
return data
class RelayHandlder:
def __init__(self, client_sock, remote_sock, selector):
'''
client_sock and remote_sock have already finished the connection.
'''
self._client_sock = client_sock
self._remote_sock = remote_sock
self._selector = selector
self._send_buffer = b''
self._recv_buffer = b''
self._selector.register(self._client_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._client_handler)
self._selector.register(self._remote_sock, selectors.EVENT_READ|selectors.EVENT_WRITE, self._remote_handler)
def _client_handler(self, client_sock, mask):
if mask & selectors.EVENT_READ:
data = recv_from(client_sock)
if data:
self._send_buffer = data
else:
self._close()
elif mask & selectors.EVENT_WRITE:
if self._recv_buffer:
try:
client_sock.send(self._recv_buffer)
self._recv_buffer = b''
except OSError:
self._close()
def _remote_handler(self, remote_sock, mask):
if mask & selectors.EVENT_READ:
data = recv_from(remote_sock)
if data:
self._recv_buffer = data
else:
self._close()
elif mask & selectors.EVENT_WRITE:
if self._send_buffer:
try:
remote_sock.send(self._send_buffer)
self._send_buffer = b''
except OSError:
self._close()
def _close(self):
print('Closing ...')
self._selector.unregister(self._client_sock)
self._client_sock.close()
self._selector.unregister(self._remote_sock)
self._remote_sock.close()
self._send_buffer = b''
self._recv_buffer = b''
class PortMapper:
'''
Map the remote port to local.
'''
def __init__(self, proxy_ip, proxy_port, remote_ip, remote_port):
self.proxy_ip = proxy_ip
self.proxy_port = proxy_port
self.remote_ip = remote_ip
self.remote_port = remote_port
self._selector = selectors.DefaultSelector()
self._proxy_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
self._proxy_sock.bind((proxy_ip, proxy_port))
break
except OSError:
proxy_port += 1
self.proxy_port = proxy_port
self._proxy_sock.listen(10)
self._proxy_sock.setblocking(False)
self._selector.register(self._proxy_sock, selectors.EVENT_READ, self._accept_handler)
print('Listening at {}:{}'.format(proxy_ip, proxy_port))
def _accept_handler(self, proxy_sock, mask):
client_sock, addr = proxy_sock.accept()
client_sock.setblocking(False)
print('Accept from {}'.format(addr))
remote_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_sock.setblocking(False)
try:
remote_sock.connect((self.remote_ip, self.remote_port))
except BlockingIOError:
pass
RelayHandlder(client_sock, remote_sock, self._selector)
def loop(self):
while True:
events = self._selector.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Port Map Tool.')
parser.add_argument('-H', '--remote-host', required=True, type=str, help='Remote host.')
parser.add_argument('-P', '--remote-port', required=True, type=int, help='Remote port.')
parser.add_argument('-p', '--local-port', default=1000, type=int, help='Local port.')
args = parser.parse_args()
PortMapper('0.0.0.0', args.local_port, args.remote_host, args.remote_port).loop()
В коде self._send_buffer
в RelayHandler
используется для кеширования данных, полученных от клиента. Если self._remote_sock
доступен для записи, а self._send_buffer
не пуст, прокси отправит self._send_buffer
удаленной службе. Logi c аналогичен self._client_sock
. Основной l oop определен в функции loop
в PortMapper
.
У меня два вопроса:
- Является ли неблочный сокет доступным для записи после завершения подключение в python?
- В приведенном выше коде сокеты всегда доступны для записи, а буферы часто пусты. Таким образом, для каждого отдельного l oop в основном l oop,
self._selector.select()
всегда будет возвращаться без блокировки, а обратный вызов выполняется, чтобы ничего не делать, а только для того, чтобы увидеть, пуст ли буфер, что может повредить производительности. Есть ли лучший метод или структура для этого?