Asyncio через несколько долгоживущих клиентских подключений - PullRequest
0 голосов
/ 21 февраля 2020

Я пытаюсь написать простое приложение, которое подключается к нескольким элементарным TCP-серверам с долгоживущими соединениями, где я отправляю / получаю данные от каждого соединения. В основном получение событий от сервера, отправка команд обратно и получение результатов от команд. Подумайте об управлении устройством, но я хочу управлять N устройствами в одном потоке.

У меня есть работающая не асинхронная c, неблокирующая реализация, но time.sleep () убивает отзывчивость или убивает процессор, а использование select намного круче.

Учитывая приведенный ниже пример, я хочу подключиться ко всем трем серверам, и await receive_message на каждом из них одновременно. В настоящее время он заблокирован в connect в receive_message (), поэтому я получаю только такой вывод:

Connecting to 1
Sending password to 1
Waiting for message from 1

Я бы хотел получить что-то похожее на это, не совсем, но показывая, что соединения все независимо запланированы.

Connecting to 1
Connecting to 2
Connecting to 3
Sending password to 1
Sending password to 2
Sending password to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3
Connected to 1
Connected to 2
Connected to 3
Waiting for message from 1
Waiting for message from 2
Waiting for message from 3

Разбитая версия того, что я пытаюсь. Нет, настоящий сервер не так уж небезопасен ... это всего лишь пример.

import asyncio


class Connection:
    def __init__(self, name, host, port):
        self.name = name
        self.host = host
        self.port = port
        self.reader, self.writer = None, None
        self.connected = False

    async def connect(self):
        print(f'Connecting to {self.name}')
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
        await self.send_message('password')
        response = await self.receive_message()
        if response == 'SUCCESS':
            self.connected = True
            print(f'Connected to {self.name}')
        else:
            raise Exception('unsuccessful connection')
        print(f'Connected to {self.name}')

    async def send_message(self, message):
        print(f'Sending {message} to {self.name}')
        self.writer.write(f'{message}\n'.encode('utf-8'))

    async def receive_message(self):
        print(f'Waiting for message from {self.name}')
        return (await self.reader.readline()).decode('utf-8')


connections = (
    Connection(1, 'localhost', 21114),
    Connection(2, 'localhost', 21115),
    Connection(3, 'localhost', 21116)
)


async def run():
    for connection in connections:
        await connection.connect()
    # how to receive messages from each connection as they are received
    for connection in connections:
        await connection.receive_message()

asyncio.run(run())

1 Ответ

1 голос
/ 21 февраля 2020

await в for l oop эффективно сериализует ваши соединения. В асинхронном параллелизме происходит на уровне задача , поэтому, если вы хотите, чтобы соединения выполнялись параллельно, вам нужно создать несколько задач (или использовать функцию, которая сделает это за вас, например asyncio.gather). ). Например:

async def handle_connection(conn):
    await conn.connect()
    await conn.receive_message()
    # ...

async def run():
    tasks = []
    for conn in connections:
        tasks.append(asyncio.create_task(handle_connection(conn)))
    # wait until all the tasks finish
    await asyncio.gather(*tasks)
...