python asyn c l oop для отправки полезной нагрузки udp каждые 15 секунд и прекращения попыток через 60 секунд - PullRequest
1 голос
/ 25 мая 2020

Моя цель - создать асинхронный c l oop, который подключается к IP-адресам UDP и получает ответ. Однако, если ответ не получен в течение 15 секунд, повторно отправьте запрос и прекратите попытки через 60 секунд.

main.py

async def main():
    tasks = []
    for i, l in zip(result["tr"], range(len(result["tr"]))):
        task = create_task(udp.connect(urlparse(i).hostname, urlparse(i).port))
        tasks.append(task)

    done, pending = await wait(tasks)
    for i in done:
        print(i.result())

udpClient.py

async def connect(udp, port):
    payload = b'sample_data_input'
    on_conn_lost, datagram_data = loop.create_future(), loop.create_future()

    transport, protocol = await self.default_event_loop.create_datagram_endpoint(
        lambda: ProtocolFactoryUDP(payload, on_conn_lost, datagram_data),
        remote_addr=(str(udp), int(port)))

    try:
        await on_conn_lost
        return datagram_data
    except Exception as e:
        print(e)

ProtocolFactoryUDP -> Это то же самое, что и в примере на python .org

class ProtocolFactoryUDP:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None
        self.datagram_data = datagram_data

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())
        self.datagram_data.set_result((data, addr))
        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)

В настоящее время этот код позволяет мне подключаться к целевым объектам UDP и распечатывать ответ.

Однако это не достигает моей цели, потому что, если сервер ничего не отвечает / отвечает слишком долго, udpClient просто ждет там вечно. Как я могу создать asyn c l oop, чтобы, если ответ не был получен в течение 15 секунд, повторно отправить запрос и завершить работу с пустой полезной нагрузкой (или завершить работу корректно) после 60 секунд попытки.

Редактировать 1 с обновленным кодом от paul

main.py

for i, l in zip(result["tr"], range(len(result["tr"]))):
    task = create_task(udp.connect(urlparse(i).hostname, urlparse(i).port))
    tasks.append(task)

for f in as_completed(tasks):
    result = await f

udpClient.py

...
for _ in range(4):
    try:
        transport, protocol = await self.default_event_loop.create_datagram_endpoint(
            lambda: ProtocolFactoryUDP(payload, on_conn_lost, datagram_data),
            remote_addr=(str(udp), int(port)))

        await asyncio.wait_for(on_conn_lost, 15.0)  # -> Met with CancelledError here.. 
        return datagram_data
    except TimeoutError:
        pass

1 Ответ

1 голос
/ 26 мая 2020

Вы дали прекрасное описание того, что вы пытаетесь сделать, и часто это самая сложная часть. Чтобы сократить его до кода, вы можете посмотреть на функцию asyncio.wait_for, а также на функцию asyncio.as_completed. Вот набросок того, что, я думаю, вам нужно. Я оставляю вам детали UDP.

Функция wait_for принимает значение тайм-аута (15 секунд в вашем случае). Вы хотите попробовать 4 раза, поэтому все, что вам нужно, это просто для l oop. Вы ловите TimeoutError и повторяете цикл; позвольте другим ошибкам распространиться обратно на вызывающую сторону. Если вы не получите тайм-аут, вы просто вернетесь, что завершит задачу.

Вы можете распечатать результаты по мере их завершения или дождаться их завершения с помощью asyncio.gather (return_exceptions = True).

async def main():
    tasks = []
    for i, l in zip(result["tr"], range(len(result["tr"]))):
        coro = my_task(udp.connect(urlparse(i).hostname, urlparse(i).port))
        tasks.append(asyncio.create_task(coro))

    for i in asyncio.as_completed(tasks):
        print(i.result())

async def my_task(host, port):
    for _ in range(4):
        try:
            await asyncio.wait_for(do_something_with_udp(host, port), 15.0)
            return
        except asyncio.TimeoutError:
            pass
...