Python 3 - несколько соединений AsyncIO - PullRequest
0 голосов
/ 04 декабря 2018

Я пытаюсь научиться использовать AsyncIO в Python 3.7, и меня все еще немного смущают его принципы.

Моя цель - написать простую программу чата, однако мне нужно использовать кольцевую сетьтопология - один узел знает только о двух своих соседях.Когда сообщение отправляется, оно передается узлами, пока оно не достигает отправителя снова.Это означает, что каждый узел в основном является клиентом и сервером одновременно.

Мне также нужно иметь возможность обнаруживать мертвые узлы, чтобы мое кольцо не разрывалось.

Я думалдля каждого узла может быть хорошим решением иметь отдельное соединение для каждого соседа - successor и predecessor.

class Node:
    ...
    def run():
        ...
        s = loop.create_connection(lambda: Client(...), addr1, port1)
        p = loop.create_server(lambda: Server(...), addr2, port2)
        successor = loop.run_until_complete(s)
        predecessor = loop.run_until_complete(p)
        loop.run_forever()
        ...
    ...

Server и Client - это классы, которые реализуют asyncio.Protocol.

Причина, по которой я хотел сделать это, заключается в том, что если сообщение отправляется через круг, оно всегда отправляется с predecessor на successorconnection_lost методе predecessor я могу обнаружить, что он отключен, и отправить его predecessor сообщение (через все кольцо) для соединения со мной.

Я хотел бы иметь возможность отправить сообщение, полученное от моего predecessor, далее на мой successor.Я также хотел бы иметь возможность отправить сообщение с моим адресом на мой successor в случае, если мой predecessor умрет (это сообщение будет отправлено с predecessor 'Server.connection_lost() и будет передано полностью моемуdead predecessor s predecessor).

Мой вопрос: Могу ли я передать полученные данные из predecessor в successor?Если нет, что будет лучшей реализацией этой программы, которая использует AsyncIO и топологию кольца?

1 Ответ

0 голосов
/ 16 декабря 2018

Для любого новичка в AsyncIO, имеющего ту же проблему, я сам нашел решение.

Прежде всего, лучше использовать аспекты высокого уровня AsyncIO - streams.Вызов loop.create_connction и loop.create_server считается низкоуровневым (что я сначала понял неправильно).

Высокоуровневой альтернативой create_connection является asyncio.open_connection, которая предоставит вам кортеж, состоящий из asyncio.StreamReader и asyncio.StreamWriter, который вы можете использовать для чтения и записи в открытое соединение.Вы также можете обнаружить потерю соединения, когда данные, считанные из StreamReader, равны b'' или когда вы перехватываете исключение (ConnectionError) при попытке записи в StreamWriter.

Высокоуровневой альтернативой create_server является asyncio.start_server, которая должна быть снабжена функцией обратного вызова, которая будет вызываться каждый раз, когда устанавливается соединение с сервером (открытое соединение, полученные данные ...).Обратный вызов имеет StreamReader и StreamWriter в качестве аргументов.Потеря соединения также может быть обнаружена при получении b'' или ConnectionError при записи в writer.

С помощью сопрограмм можно обработать несколько соединений.Может быть сопрограмма для серверной части (которая принимает соединение от одного из соседей в топологии кольца) и сопрограмма для клиентской части (которая открывает соединение с другим соседом в кольце).Класс Node может выглядеть следующим образом:

import asyncio

class Node:
    ...
    async def run(self):
        ...
        self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
        server_coro = asyncio.create_task(self.server_init())
        client_coro = asyncio.create_task(self.client_method())
        await client_coro
        await server_coro
        ...

    async def server_init(self):
        server = await asyncio.start_server(self.server_callback, self.IP, self.port)
        async with server:
            await server.serve_forever()

    async def client_method(self):
        ...
        try:
            data = await self.next_reader.read()
        except ConnectionError:
            ...
    ...

Обратите внимание, что я использую asyncio.create_task для сопрограмм и (не здесь в списке кода) asyncio.run(node.run()), которые считаются альтернативами высокого уровняasyncio.ensure_future() и loop.run_forever().Оба из них были добавлены в Python 3.7, и asyncio.run(), как говорят, является предварительным, так что к тому времени, когда вы прочитаете это, это уже могло быть заменено чем-то другим.

Я не эксперт AsyncIO,так что может быть лучший, более чистый способ сделать это (если вы знаете это, пожалуйста, поделитесь им).

...