Python - неблокирующие сокеты с использованием селекторов - PullRequest
0 голосов
/ 29 октября 2018

Моя проблема вкратце: Я не знаю, как селектор знает, какой сокет должен читать или писать первым.

Это сервер, который может обрабатывать несколько соединений, и его поток должен быть:

  1. Сервер создает сокет прослушивания
  2. Клиент создает 2 сокета и подключает их к серверу
  3. Клиент 2 сокета отправляют сообщения
  4. Сервер 2 сокета повторяют эти сообщения, закрытие клиента и сервера подключение

что и происходит, но если созданные серверные сокеты будут писать первыми, соединение будет немедленно закрыто или вызовет исключение (?), Так как оно даже не вызывает send, а клиентский сокет ничего не получит. Итак, как селектор узнает, какие сокеты должны быть готовы для записи / чтения в первую очередь? Какую информацию мне не хватает, чтобы понять это?

Сервер:

import socket
import selectors
import types

host = "127.0.0.1"
port = 63210

def accept_wrapper(sock):
    conn, addr = sock.accept()
    print('accepted connection from', addr)
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)
        if recv_data:
            data.outb += recv_data
        else:
            print('closing connection to', data.addr)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print('echoing', repr(data.outb), 'to', data.addr)
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

sel = selectors.DefaultSelector()

lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)

while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        if key.data is None:
            accept_wrapper(key.fileobj)
        else:
            service_connection(key, mask)

Клиент:

import socket
import selectors
import types

host = "127.0.0.1"
port = 63210
num_conns = 2
messages = [b'Message 1 from client.', b'Message 2 from client.']

def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print('starting connection', connid, 'to', server_addr)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(connid=connid,
                                     msg_total=sum(len(m) for m in messages),
                                     recv_total=0,
                                     messages=list(messages),
                                     outb=b'')
        sel.register(sock, events, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)
        if recv_data:
            print('received', repr(recv_data), 'from connection', data.connid)
            data.recv_total += len(recv_data)
        if not recv_data or data.recv_total == data.msg_total:
            print('closing connection', data.connid)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if not data.outb and data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print('sending', repr(data.outb), 'to connection', data.connid)
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

sel = selectors.DefaultSelector()
start_connections(host, port, num_conns)

while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        service_connection(key, mask)

1 Ответ

0 голосов
/ 29 октября 2018

Сокеты на самом деле не пишут напрямую одноранговому узлу и не читают с однорангового узла. Вместо этого они записывают в локальный буфер записи конкретного сокета и читают из буфера чтения конкретного сокета. Ядро ОС заботится о доставке данных из буфера записи сокета в одноранговый узел и помещает полученные пакеты от однорангового узла в приемный буфер сокетов.

Состояние этих буферов сокетов в ядре и изменения этих буферов можно отслеживать с помощью таких функций, как select, poll, kqueue. По сути: сокет считается доступным для записи, если в буфере записи в сокеты есть место. Сокет считается читаемым, если в буфере чтения сокетов есть данные.

...