asyncio tcp socket: как отменить asyncio.sleep (), когда сокет закрыт? - PullRequest
0 голосов
/ 10 октября 2019

В настоящее время я реализую протокол сокета TCP. Протокол требует отправки сообщений сердцебиения каждые пять минут. Я реализую протокол с использованием Asyncio в Python. Исходный код ниже - это программа, которая подключается к localhost: 8889, отправляет привет и отключает сокет через 1 секунду. В этом случае соединение отключается через одну секунду (если это действительно происходит, сеть отключена или сервер отключен). Проблема в том, что функция send_heartbeat ждет 5 минут, не зная, что сокет не работает. Я хотел бы немедленно отменить сопрограмму вместо ожидания 5 минут, когда розетка отключена. Какой лучший способ сделать это?

import asyncio


async def run(host: str, port: int):
    while True:
        try:
            reader, writer = await asyncio.open_connection(host, port)

        except OSError as e:
            print('connection failed:', e)
            await asyncio.sleep(0.5)
            continue

        await asyncio.wait([
            handle_stream(reader, writer),
            send_heartbeat(reader, writer),
        ], return_when=asyncio.FIRST_COMPLETED)  # will stop after 1 second

        writer.close()  # close socket after 1 second
        await writer.wait_closed()


async def handle_stream(reader, writer):
    writer.write(b'hello\n')  # will success because socket is alive
    await writer.drain()

    await asyncio.sleep(1)


async def send_heartbeat(reader, writer):
    while True:
        await asyncio.sleep(300)

        heartbeat_message = b'heartbeat\n'
        writer.write(heartbeat_message)  # will fail because socket is already closed after 1 second
        await writer.drain()


if __name__ == '__main__':
    asyncio.run(run('127.0.0.1', 8889))

1 Ответ

2 голосов
/ 10 октября 2019

Вы можете отменить режим сна, отменив задачу, которая его выполняет. Создание send_heartbeat в качестве отдельной задачи гарантирует, что она будет выполняться параллельно handle_stream, пока вы ожидаете последнего:

async def run(host: str, port: int):
    while True:
        ...
        heartbeat = asyncio.create_task(send_heartbeat(reader, writer))
        try:
            await handle_stream(reader, writer)
        finally:
            heartbeat.cancel()
            writer.close()
        await writer.wait_closed()

Кстати, так как вы ожидаете writer.drain() внутри handle_stream, естьнет гарантии, что handle_stream всегда будет завершен в течение 1 секунды. Это может быть место, где вы можете избежать утечки, или вы можете использовать asyncio.wait_for, ожидая handle_stream(...).

...