Как ждать события ошибки / закрытия для сокета с asyncio? - PullRequest
0 голосов
/ 29 мая 2019

Я использую сетевую библиотеку, которая предоставляет оболочку для использования функций сопрограммы с asyncio.Когда я написал тест, который случайным образом закрывает соединение (чтобы проверить, устойчива ли моя программа в плохих условиях), я обнаружил, что оно зависает бесконечно.

Это выглядело как ошибка в оболочке, предоставленной библиотекойиспользуя, потому что программа зависает в ожидании обратного вызова от loop.add_reader() или loop.add_writer(), но тогда я не мог найти, как получать уведомления, когда сокет закрыт.

Это минимальная программа, которая показывает, чтопроисходит с моей программой:

import asyncio
import socket

async def kill_later(c):
    await asyncio.sleep(0.1)
    c.close()

async def main():
    loop = asyncio.get_running_loop()

    c = socket.create_connection(('www.google.com', 80))
    c.setblocking(0)

    ev = asyncio.Event()
    loop.add_reader(c, ev.set)

    # Closes the socket after 0.1 ms:
    asyncio.create_task(kill_later(c))

    print("waiting...")

    #### ↓ THIS WAITS FOREVER ↓ ####
    await ev.wait()

asyncio.run(main())

Мой вопрос: как получить уведомление о закрытии сокета с помощью цикла asyncio?

РЕДАКТИРОВАТЬ: из-за широкого спроса, сделал сокет не-блокирование, но это не имеет значения, потому что add_reader() не пытается выполнить какой-либо ввод-вывод на сокете, просто следит за тем, когда он будет готов.

Ответы [ 2 ]

1 голос
/ 30 мая 2019

Ваша тестовая программа имеет недостатки. Вызов c.close() не эмулирует сокет, закрываемый другим концом, он закрывает ваш собственный файловый дескриптор и делает его недоступным. Вы можете думать о close(fd) как о разрыве связи между номером fd и базовым ресурсом ОС. После этого чтение и опрос fd теряют смысл, поскольку число больше ни на что не ссылается. В результате epoll() не может и не будет сообщать о закрытом дескрипторе файла как о «читаемом».

Чтобы проверить условие, которое вы хотите проверить, используйте другой конец , чтобы закрыть соединение. Самый простой способ сделать это - создать другой процесс или поток как фиктивный сервер. Например:

import asyncio, threading, socket, time

def start_mock_server():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(('localhost', 10000))
    s.listen(1)
    def serve():
        conn, addr = s.accept()
        time.sleep(1)
        conn.close()
        s.close()
    threading.Thread(target=serve).start()

async def main():
    loop = asyncio.get_running_loop()
    start_mock_server()

    c = socket.create_connection(('localhost', 10000))
    c.setblocking(0)

    ev = asyncio.Event()
    loop.add_reader(c.fileno(), ev.set)

    print("waiting...")
    await ev.wait()
    print("done")

asyncio.run(main())
0 голосов
/ 30 мая 2019

Проблема, по крайней мере с вашим примером, в том, что ваш сокет никогда ничего не получает, следовательно, событие никогда не устанавливается.Документы упоминаются в примере :

Подождите, пока файловый дескриптор не получит некоторые данные, используя метод loop.add_reader (), а затем закройте цикл обработки событий

Чтобы ваш пример работал, вам сначала нужно отправить запрос в Google:

c = socket.create_connection(('www.google.com', 80))
c.sendall("GET /\r\n".encode())

Это установит событие, которое вы await позже в своем main coro.

epoll считает файловый дескриптор готовым к чтению, если по нему становятся доступными данные или если передается EOF, как описано в в этом ответе .

add_writer() не блокирует в такой ситуации, поскольку дескриптор файла считается готовым к записи, если во входном буфере все еще есть свободное место.

Вызов recv(), как я упоминал в предыдущей редакции моего ответа, не требуется.

...